/*-------------------------------------------------------------------------
 *
 * lock.c
 *      POSTGRES primary lock mechanism
 *
 * Portions Copyright (c) 2012-2014, TransLattice, Inc.
 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * This source code file contains modifications made by THL A29 Limited ("Tencent Modifications").
 * All Tencent Modifications are Copyright (C) 2023 THL A29 Limited.
 *
 * IDENTIFICATION
 *      src/backend/storage/lmgr/lock.c
 *
 * NOTES
 *      A lock table is a shared memory hash table.  When
 *      a process tries to acquire a lock of a type that conflicts
 *      with existing locks, it is put to sleep using the routines
 *      in storage/lmgr/proc.c.
 *
 *      For the most part, this code should be invoked via lmgr.c
 *      or another lock-management module, not directly.
 *
 *    Interface:
 *
 *    InitLocks(), GetLocksMethodTable(), GetLockTagsMethodTable(),
 *    LockAcquire(), LockRelease(), LockReleaseAll(),
 *    LockCheckConflicts(), GrantLock()
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include <signal.h>
#include <unistd.h>

#include "access/transam.h"
#include "access/twophase.h"
#include "access/twophase_rmgr.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "miscadmin.h"
#include "pg_trace.h"
#include "pgstat.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/sinvaladt.h"
#include "storage/spin.h"
#include "storage/standby.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/resowner_private.h"


/* This configuration variable is used to set the lock table size */
int            max_locks_per_xact; /* set by guc.c */

#define NLOCKENTS() \
    mul_size(max_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))


/*
 * Data structures defining the semantics of the standard lock methods.
 *
 * The conflict table defines the semantics of the various lock modes.
 */
static const LOCKMASK LockConflicts[] = {
    0,

    /* AccessShareLock */
    LOCKBIT_ON(AccessExclusiveLock),

    /* RowShareLock */
    LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),

    /* RowExclusiveLock */
    LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
    LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),

    /* ShareUpdateExclusiveLock */
    LOCKBIT_ON(ShareUpdateExclusiveLock) |
    LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
    LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),

    /* ShareLock */
    LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
    LOCKBIT_ON(ShareRowExclusiveLock) |
    LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),

    /* ShareRowExclusiveLock */
    LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
    LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
    LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),

    /* ExclusiveLock */
    LOCKBIT_ON(RowShareLock) |
    LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
    LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
    LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),

    /* AccessExclusiveLock */
    LOCKBIT_ON(AccessShareLock) | LOCKBIT_ON(RowShareLock) |
    LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
    LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
    LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock)

};

/* Names of lock modes, for debug printouts */
static const char *const lock_mode_names[] =
{
    "INVALID",
    "AccessShareLock",
    "RowShareLock",
    "RowExclusiveLock",
    "ShareUpdateExclusiveLock",
    "ShareLock",
    "ShareRowExclusiveLock",
    "ExclusiveLock",
    "AccessExclusiveLock"
};

#ifndef LOCK_DEBUG
static bool Dummy_trace = false;
#endif

static const LockMethodData default_lockmethod = {
    AccessExclusiveLock,        /* highest valid lock mode number */
    LockConflicts,
    lock_mode_names,
#ifdef LOCK_DEBUG
    &Trace_locks
#else
    &Dummy_trace
#endif
};

static const LockMethodData user_lockmethod = {
    AccessExclusiveLock,        /* highest valid lock mode number */
    LockConflicts,
    lock_mode_names,
#ifdef LOCK_DEBUG
    &Trace_userlocks
#else
    &Dummy_trace
#endif
};

/*
 * map from lock method id to the lock table data structures
 */
static const LockMethod LockMethods[] = {
    NULL,
    &default_lockmethod,
    &user_lockmethod
};


/* Record that's written to 2PC state file when a lock is persisted */
typedef struct TwoPhaseLockRecord
{
    LOCKTAG        locktag;
    LOCKMODE    lockmode;
} TwoPhaseLockRecord;


/*
 * Count of the number of fast path lock slots we believe to be used.  This
 * might be higher than the real number if another backend has transferred
 * our locks to the primary lock table, but it can never be lower than the
 * real value, since only we can acquire locks on our own behalf.
 */
static int    FastPathLocalUseCount = 0;

/* Macros for manipulating proc->fpLockBits */
#define FAST_PATH_BITS_PER_SLOT            3
#define FAST_PATH_LOCKNUMBER_OFFSET        1
#define FAST_PATH_MASK                    ((1 << FAST_PATH_BITS_PER_SLOT) - 1)
#define FAST_PATH_GET_BITS(proc, n) \
    (((proc)->fpLockBits >> (FAST_PATH_BITS_PER_SLOT * n)) & FAST_PATH_MASK)
#define FAST_PATH_BIT_POSITION(n, l) \
    (AssertMacro((l) >= FAST_PATH_LOCKNUMBER_OFFSET), \
     AssertMacro((l) < FAST_PATH_BITS_PER_SLOT+FAST_PATH_LOCKNUMBER_OFFSET), \
     AssertMacro((n) < FP_LOCK_SLOTS_PER_BACKEND), \
     ((l) - FAST_PATH_LOCKNUMBER_OFFSET + FAST_PATH_BITS_PER_SLOT * (n)))
#define FAST_PATH_SET_LOCKMODE(proc, n, l) \
     (proc)->fpLockBits |= UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l)
#define FAST_PATH_CLEAR_LOCKMODE(proc, n, l) \
     (proc)->fpLockBits &= ~(UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l))
#define FAST_PATH_CHECK_LOCKMODE(proc, n, l) \
     ((proc)->fpLockBits & (UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l)))

/*
 * The fast-path lock mechanism is concerned only with relation locks on
 * unshared relations by backends bound to a database.  The fast-path
 * mechanism exists mostly to accelerate acquisition and release of locks
 * that rarely conflict.  Because ShareUpdateExclusiveLock is
 * self-conflicting, it can't use the fast-path mechanism; but it also does
 * not conflict with any of the locks that do, so we can ignore it completely.
 */
#define EligibleForRelationFastPath(locktag, mode) \
    ((locktag)->locktag_lockmethodid == DEFAULT_LOCKMETHOD && \
    (locktag)->locktag_type == LOCKTAG_RELATION && \
    (locktag)->locktag_field1 == MyDatabaseId && \
    MyDatabaseId != InvalidOid && \
    (mode) < ShareUpdateExclusiveLock)
#define ConflictsWithRelationFastPath(locktag, mode) \
    ((locktag)->locktag_lockmethodid == DEFAULT_LOCKMETHOD && \
    (locktag)->locktag_type == LOCKTAG_RELATION && \
    (locktag)->locktag_field1 != InvalidOid && \
    (mode) > ShareUpdateExclusiveLock)

static bool FastPathGrantRelationLock(Oid relid, LOCKMODE lockmode);
static bool FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode);
static bool FastPathTransferRelationLocks(LockMethod lockMethodTable,
                              const LOCKTAG *locktag, uint32 hashcode);
static PROCLOCK *FastPathGetRelationLockEntry(LOCALLOCK *locallock);

/*
 * To make the fast-path lock mechanism work, we must have some way of
 * preventing the use of the fast-path when a conflicting lock might be
 * present.  We partition* the locktag space into FAST_PATH_HASH_BUCKETS
 * partitions, and maintain an integer count of the number of "strong" lockers
 * in each partition.  When any "strong" lockers are present (which is
 * hopefully not very often), the fast-path mechanism can't be used, and we
 * must fall back to the slower method of pushing matching locks directly
 * into the main lock tables.
 *
 * The deadlock detector does not know anything about the fast path mechanism,
 * so any locks that might be involved in a deadlock must be transferred from
 * the fast-path queues to the main lock table.
 */

#define FAST_PATH_STRONG_LOCK_HASH_BITS            10
#define FAST_PATH_STRONG_LOCK_HASH_PARTITIONS \
    (1 << FAST_PATH_STRONG_LOCK_HASH_BITS)
#define FastPathStrongLockHashPartition(hashcode) \
    ((hashcode) % FAST_PATH_STRONG_LOCK_HASH_PARTITIONS)

typedef struct
{
    slock_t        mutex;
    uint32        count[FAST_PATH_STRONG_LOCK_HASH_PARTITIONS];
} FastPathStrongRelationLockData;

static volatile FastPathStrongRelationLockData *FastPathStrongRelationLocks;


/*
 * Pointers to hash tables containing lock state
 *
 * The LockMethodLockHash and LockMethodProcLockHash hash tables are in
 * shared memory; LockMethodLocalHash is local to each backend.
 */
static HTAB *LockMethodLockHash;
static HTAB *LockMethodProcLockHash;
static HTAB *LockMethodLocalHash;


/* private state for error cleanup */
static LOCALLOCK *StrongLockInProgress;
static LOCALLOCK *awaitedLock;
static ResourceOwner awaitedOwner;

static LockAcquireResult LockAcquireExtendedXC(const LOCKTAG *locktag,
LOCKMODE lockmode, bool sessionLock, bool dontWait, bool reportMemoryError,
bool only_increment);


#ifdef LOCK_DEBUG

/*------
 * The following configuration options are available for lock debugging:
 *
 *       TRACE_LOCKS        -- give a bunch of output what's going on in this file
 *       TRACE_USERLOCKS    -- same but for user locks
 *       TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid
 *                           (use to avoid output on system tables)
 *       TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally
 *       DEBUG_DEADLOCKS    -- currently dumps locks at untimely occasions ;)
 *
 * Furthermore, but in storage/lmgr/lwlock.c:
 *       TRACE_LWLOCKS    -- trace lightweight locks (pretty useless)
 *
 * Define LOCK_DEBUG at compile time to get all these enabled.
 * --------
 */

int            Trace_lock_oidmin = FirstNormalObjectId;
bool        Trace_locks = false;
bool        Trace_userlocks = false;
int            Trace_lock_table = 0;
bool        Debug_deadlocks = false;


inline static bool
LOCK_DEBUG_ENABLED(const LOCKTAG *tag)
{
    return
        (*(LockMethods[tag->locktag_lockmethodid]->trace_flag) &&
         ((Oid) tag->locktag_field2 >= (Oid) Trace_lock_oidmin))
        || (Trace_lock_table &&
            (tag->locktag_field2 == Trace_lock_table));
}


inline static void
LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
{
    if (LOCK_DEBUG_ENABLED(&lock->tag))
        elog(LOG,
             "%s: lock(%p) id(%u,%u,%u,%u,%u,%u) grantMask(%x) "
             "req(%d,%d,%d,%d,%d,%d,%d)=%d "
             "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
             where, lock,
             lock->tag.locktag_field1, lock->tag.locktag_field2,
             lock->tag.locktag_field3, lock->tag.locktag_field4,
             lock->tag.locktag_type, lock->tag.locktag_lockmethodid,
             lock->grantMask,
             lock->requested[1], lock->requested[2], lock->requested[3],
             lock->requested[4], lock->requested[5], lock->requested[6],
             lock->requested[7], lock->nRequested,
             lock->granted[1], lock->granted[2], lock->granted[3],
             lock->granted[4], lock->granted[5], lock->granted[6],
             lock->granted[7], lock->nGranted,
             lock->waitProcs.size,
             LockMethods[LOCK_LOCKMETHOD(*lock)]->lockModeNames[type]);
}


inline static void
PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
{
    if (LOCK_DEBUG_ENABLED(&proclockP->tag.myLock->tag))
        elog(LOG,
             "%s: proclock(%p) lock(%p) method(%u) proc(%p) hold(%x)",
             where, proclockP, proclockP->tag.myLock,
             PROCLOCK_LOCKMETHOD(*(proclockP)),
             proclockP->tag.myProc, (int) proclockP->holdMask);
}
#else                            /* not LOCK_DEBUG */

#define LOCK_PRINT(where, lock, type)  ((void) 0)
#define PROCLOCK_PRINT(where, proclockP)  ((void) 0)
#endif                            /* not LOCK_DEBUG */


static uint32 proclock_hash(const void *key, Size keysize);
static void RemoveLocalLock(LOCALLOCK *locallock);
static PROCLOCK *SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
                 const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode);
static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode);
static void FinishStrongLockAcquire(void);
static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
static void ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock);
static void LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent);
static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
            PROCLOCK *proclock, LockMethod lockMethodTable);
static void CleanUpLock(LOCK *lock, PROCLOCK *proclock,
            LockMethod lockMethodTable, uint32 hashcode,
            bool wakeupNeeded);
static void LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
                     LOCKTAG *locktag, LOCKMODE lockmode,
                     bool decrement_strong_lock_count);
static void GetSingleProcBlockerStatusData(PGPROC *blocked_proc,
                               BlockedProcsData *data);


/*
 * InitLocks -- Initialize the lock manager's data structures.
 *
 * This is called from CreateSharedMemoryAndSemaphores(), which see for
 * more comments.  In the normal postmaster case, the shared hash tables
 * are created here, as well as a locallock hash table that will remain
 * unused and empty in the postmaster itself.  Backends inherit the pointers
 * to the shared tables via fork(), and also inherit an image of the locallock
 * hash table, which they proceed to use.  In the EXEC_BACKEND case, each
 * backend re-executes this code to obtain pointers to the already existing
 * shared hash tables and to create its locallock hash table.
 */
void
InitLocks(void)
{
    HASHCTL        info;
    long        init_table_size,
                max_table_size;
    bool        found;

    /*
     * Compute init/max size to request for lock hashtables.  Note these
     * calculations must agree with LockShmemSize!
     */
    max_table_size = NLOCKENTS();
    init_table_size = max_table_size / 2;

    /*
     * Allocate hash table for LOCK structs.  This stores per-locked-object
     * information.
     */
    MemSet(&info, 0, sizeof(info));
    info.keysize = sizeof(LOCKTAG);
    info.entrysize = sizeof(LOCK);
    info.num_partitions = NUM_LOCK_PARTITIONS;

    LockMethodLockHash = ShmemInitHash("LOCK hash",
                                       init_table_size,
                                       max_table_size,
                                       &info,
                                       HASH_ELEM | HASH_BLOBS | HASH_PARTITION);

    /* Assume an average of 2 holders per lock */
    max_table_size *= 2;
    init_table_size *= 2;

    /*
     * Allocate hash table for PROCLOCK structs.  This stores
     * per-lock-per-holder information.
     */
    info.keysize = sizeof(PROCLOCKTAG);
    info.entrysize = sizeof(PROCLOCK);
    info.hash = proclock_hash;
    info.num_partitions = NUM_LOCK_PARTITIONS;

    LockMethodProcLockHash = ShmemInitHash("PROCLOCK hash",
                                           init_table_size,
                                           max_table_size,
                                           &info,
                                           HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);

    /*
     * Allocate fast-path structures.
     */
    FastPathStrongRelationLocks =
        ShmemInitStruct("Fast Path Strong Relation Lock Data",
                        sizeof(FastPathStrongRelationLockData), &found);
    if (!found)
        SpinLockInit(&FastPathStrongRelationLocks->mutex);

    /*
     * Allocate non-shared hash table for LOCALLOCK structs.  This stores lock
     * counts and resource owner information.
     *
     * The non-shared table could already exist in this process (this occurs
     * when the postmaster is recreating shared memory after a backend crash).
     * If so, delete and recreate it.  (We could simply leave it, since it
     * ought to be empty in the postmaster, but for safety let's zap it.)
     */
    if (LockMethodLocalHash)
        hash_destroy(LockMethodLocalHash);

    info.keysize = sizeof(LOCALLOCKTAG);
    info.entrysize = sizeof(LOCALLOCK);

    LockMethodLocalHash = hash_create("LOCALLOCK hash",
                                      16,
                                      &info,
                                      HASH_ELEM | HASH_BLOBS);
}


/*
 * Fetch the lock method table associated with a given lock
 */
LockMethod
GetLocksMethodTable(const LOCK *lock)
{
    LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*lock);

    Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
    return LockMethods[lockmethodid];
}

/*
 * Fetch the lock method table associated with a given locktag
 */
LockMethod
GetLockTagsMethodTable(const LOCKTAG *locktag)
{
    LOCKMETHODID lockmethodid = (LOCKMETHODID) locktag->locktag_lockmethodid;

    Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
    return LockMethods[lockmethodid];
}


/*
 * Compute the hash code associated with a LOCKTAG.
 *
 * To avoid unnecessary recomputations of the hash code, we try to do this
 * just once per function, and then pass it around as needed.  Aside from
 * passing the hashcode to hash_search_with_hash_value(), we can extract
 * the lock partition number from the hashcode.
 */
uint32
LockTagHashCode(const LOCKTAG *locktag)
{
    return get_hash_value(LockMethodLockHash, (const void *) locktag);
}

/*
 * Compute the hash code associated with a PROCLOCKTAG.
 *
 * Because we want to use just one set of partition locks for both the
 * LOCK and PROCLOCK hash tables, we have to make sure that PROCLOCKs
 * fall into the same partition number as their associated LOCKs.
 * dynahash.c expects the partition number to be the low-order bits of
 * the hash code, and therefore a PROCLOCKTAG's hash code must have the
 * same low-order bits as the associated LOCKTAG's hash code.  We achieve
 * this with this specialized hash function.
 */
static uint32
proclock_hash(const void *key, Size keysize)
{
    const PROCLOCKTAG *proclocktag = (const PROCLOCKTAG *) key;
    uint32        lockhash;
    Datum        procptr;

    Assert(keysize == sizeof(PROCLOCKTAG));

    /* Look into the associated LOCK object, and compute its hash code */
    lockhash = LockTagHashCode(&proclocktag->myLock->tag);

    /*
     * To make the hash code also depend on the PGPROC, we xor the proc
     * struct's address into the hash code, left-shifted so that the
     * partition-number bits don't change.  Since this is only a hash, we
     * don't care if we lose high-order bits of the address; use an
     * intermediate variable to suppress cast-pointer-to-int warnings.
     */
    procptr = PointerGetDatum(proclocktag->myProc);
    lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS;

    return lockhash;
}

/*
 * Compute the hash code associated with a PROCLOCKTAG, given the hashcode
 * for its underlying LOCK.
 *
 * We use this just to avoid redundant calls of LockTagHashCode().
 */
static inline uint32
ProcLockHashCode(const PROCLOCKTAG *proclocktag, uint32 hashcode)
{
    uint32        lockhash = hashcode;
    Datum        procptr;

    /*
     * This must match proclock_hash()!
     */
    procptr = PointerGetDatum(proclocktag->myProc);
    lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS;

    return lockhash;
}

/*
 * Given two lock modes, return whether they would conflict.
 */
bool
DoLockModesConflict(LOCKMODE mode1, LOCKMODE mode2)
{
    LockMethod    lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD];

    if (lockMethodTable->conflictTab[mode1] & LOCKBIT_ON(mode2))
        return true;

    return false;
}

/*
 * LockHasWaiters -- look up 'locktag' and check if releasing this
 *        lock would wake up other processes waiting for it.
 */
bool
LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
{// #lizard forgives
    LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
    LockMethod    lockMethodTable;
    LOCALLOCKTAG localtag;
    LOCALLOCK  *locallock;
    LOCK       *lock;
    PROCLOCK   *proclock;
    LWLock       *partitionLock;
    bool        hasWaiters = false;

    if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
        elog(ERROR, "unrecognized lock method: %d", lockmethodid);
    lockMethodTable = LockMethods[lockmethodid];
    if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
        elog(ERROR, "unrecognized lock mode: %d", lockmode);

#ifdef LOCK_DEBUG
    if (LOCK_DEBUG_ENABLED(locktag))
        elog(LOG, "LockHasWaiters: lock [%u,%u] %s",
             locktag->locktag_field1, locktag->locktag_field2,
             lockMethodTable->lockModeNames[lockmode]);
#endif

    /*
     * Find the LOCALLOCK entry for this lock and lockmode
     */
    MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
    localtag.lock = *locktag;
    localtag.mode = lockmode;

    locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
                                          (void *) &localtag,
                                          HASH_FIND, NULL);

    /*
     * let the caller print its own error message, too. Do not ereport(ERROR).
     */
    if (!locallock || locallock->nLocks <= 0)
    {
        elog(WARNING, "you don't own a lock of type %s",
             lockMethodTable->lockModeNames[lockmode]);
        return false;
    }

    /*
     * Check the shared lock table.
     */
    partitionLock = LockHashPartitionLock(locallock->hashcode);

    LWLockAcquire(partitionLock, LW_SHARED);

    /*
     * We don't need to re-find the lock or proclock, since we kept their
     * addresses in the locallock table, and they couldn't have been removed
     * while we were holding a lock on them.
     */
    lock = locallock->lock;
    LOCK_PRINT("LockHasWaiters: found", lock, lockmode);
    proclock = locallock->proclock;
    PROCLOCK_PRINT("LockHasWaiters: found", proclock);

    /*
     * Double-check that we are actually holding a lock of the type we want to
     * release.
     */
    if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
    {
        PROCLOCK_PRINT("LockHasWaiters: WRONGTYPE", proclock);
        LWLockRelease(partitionLock);
        elog(WARNING, "you don't own a lock of type %s",
             lockMethodTable->lockModeNames[lockmode]);
        RemoveLocalLock(locallock);
        return false;
    }

    /*
     * Do the checking.
     */
    if ((lockMethodTable->conflictTab[lockmode] & lock->waitMask) != 0)
        hasWaiters = true;

    LWLockRelease(partitionLock);

    return hasWaiters;
}

/*
 * LockAcquire -- Check for lock conflicts, sleep if conflict found,
 *        set lock if/when no conflicts.
 *
 * Inputs:
 *    locktag: unique identifier for the lockable object
 *    lockmode: lock mode to acquire
 *    sessionLock: if true, acquire lock for session not current transaction
 *    dontWait: if true, don't wait to acquire lock
 *
 * Returns one of:
 *        LOCKACQUIRE_NOT_AVAIL        lock not available, and dontWait=true
 *        LOCKACQUIRE_OK                lock successfully acquired
 *        LOCKACQUIRE_ALREADY_HELD    incremented count for lock already held
 *
 * In the normal case where dontWait=false and the caller doesn't need to
 * distinguish a freshly acquired lock from one already taken earlier in
 * this same transaction, there is no need to examine the return value.
 *
 * Side Effects: The lock is acquired and recorded in lock tables.
 *
 * NOTE: if we wait for the lock, there is no way to abort the wait
 * short of aborting the transaction.
 */
LockAcquireResult
LockAcquire(const LOCKTAG *locktag,
            LOCKMODE lockmode,
            bool sessionLock,
            bool dontWait)
{
    return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, true);
}

#ifdef PGXC
/*
 * LockIncrementIfExists - Special purpose case of LockAcquire().
 * This checks if there is already a reference to the lock. If yes,
 * increments it, and returns true. If not, just returns back false.
 * Effectively, it never creates a new lock.
 */
bool
LockIncrementIfExists(const LOCKTAG *locktag,
            LOCKMODE lockmode, bool sessionLock)
{
    int ret;

    ret = LockAcquireExtendedXC(locktag, lockmode,
                                sessionLock,
                                true, /* never wait */
                                true, true);

    return (ret == LOCKACQUIRE_ALREADY_HELD);
}
#endif

/*
 * LockAcquireExtended - allows us to specify additional options
 *
 * reportMemoryError specifies whether a lock request that fills the
 * lock table should generate an ERROR or not. This allows a priority
 * caller to note that the lock table is full and then begin taking
 * extreme action to reduce the number of other lock holders before
 * retrying the action.
 */
LockAcquireResult
LockAcquireExtended(const LOCKTAG *locktag,
                    LOCKMODE lockmode,
                    bool sessionLock,
                    bool dontWait,
                    bool reportMemoryError)
{
    return LockAcquireExtendedXC(locktag, lockmode, sessionLock, dontWait,
                                 reportMemoryError, false);
}

/*
 * LockAcquireExtendedXC - additional parameter only_increment. This is XC
 * specific. Check comments for the function LockIncrementIfExists()
 */
static LockAcquireResult
LockAcquireExtendedXC(const LOCKTAG *locktag,
                    LOCKMODE lockmode,
                    bool sessionLock,
                    bool dontWait,
                    bool reportMemoryError,
                    bool only_increment)
{// #lizard forgives
    LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
    LockMethod    lockMethodTable;
    LOCALLOCKTAG localtag;
    LOCALLOCK  *locallock;
    LOCK       *lock;
    PROCLOCK   *proclock;
    bool        found;
    ResourceOwner owner;
    uint32        hashcode;
    LWLock       *partitionLock;
    int            status;
    bool        log_lock = false;

    if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
        elog(ERROR, "unrecognized lock method: %d", lockmethodid);
    lockMethodTable = LockMethods[lockmethodid];
    if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
        elog(ERROR, "unrecognized lock mode: %d", lockmode);

    if (RecoveryInProgress() && !InRecovery &&
        (locktag->locktag_type == LOCKTAG_OBJECT ||
         locktag->locktag_type == LOCKTAG_RELATION) &&
        lockmode > RowExclusiveLock)
        ereport(ERROR,
                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                 errmsg("cannot acquire lock mode %s on database objects while recovery is in progress",
                        lockMethodTable->lockModeNames[lockmode]),
                 errhint("Only RowExclusiveLock or less can be acquired on database objects during recovery.")));

#ifdef LOCK_DEBUG
    if (LOCK_DEBUG_ENABLED(locktag))
        elog(LOG, "LockAcquire: lock [%u,%u] %s",
             locktag->locktag_field1, locktag->locktag_field2,
             lockMethodTable->lockModeNames[lockmode]);
#endif

    /* Identify owner for lock */
    if (sessionLock)
        owner = NULL;
    else
        owner = CurrentResourceOwner;

    /*
     * Find or create a LOCALLOCK entry for this lock and lockmode
     */
    MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
    localtag.lock = *locktag;
    localtag.mode = lockmode;

    locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
                                          (void *) &localtag,
                                          HASH_ENTER, &found);

    /*
     * if it's a new locallock object, initialize it
     */
    if (!found)
    {
        locallock->lock = NULL;
        locallock->proclock = NULL;
        locallock->hashcode = LockTagHashCode(&(localtag.lock));
        locallock->nLocks = 0;
        locallock->numLockOwners = 0;
        locallock->maxLockOwners = 8;
        locallock->holdsStrongLockCount = FALSE;
        locallock->lockOwners = NULL;    /* in case next line fails */
        locallock->lockOwners = (LOCALLOCKOWNER *)
            MemoryContextAlloc(TopMemoryContext,
                               locallock->maxLockOwners * sizeof(LOCALLOCKOWNER));
    }
    else
    {
        /* Make sure there will be room to remember the lock */
        if (locallock->numLockOwners >= locallock->maxLockOwners)
        {
            int            newsize = locallock->maxLockOwners * 2;

            locallock->lockOwners = (LOCALLOCKOWNER *)
                repalloc(locallock->lockOwners,
                         newsize * sizeof(LOCALLOCKOWNER));
            locallock->maxLockOwners = newsize;
        }
    }
    hashcode = locallock->hashcode;

    /*
     * If we already hold the lock, we can just increase the count locally.
     */
    if (locallock->nLocks > 0)
    {
        GrantLockLocal(locallock, owner);
        return LOCKACQUIRE_ALREADY_HELD;
    }
#ifdef PGXC
    else if (only_increment)
    {
        /* User does not want to create new lock if it does not already exist */
        return LOCKACQUIRE_NOT_AVAIL;
    }
#endif
    /*
     * Prepare to emit a WAL record if acquisition of this lock needs to be
     * replayed in a standby server.
     *
     * Here we prepare to log; after lock is acquired we'll issue log record.
     * This arrangement simplifies error recovery in case the preparation step
     * fails.
     *
     * Only AccessExclusiveLocks can conflict with lock types that read-only
     * transactions can acquire in a standby server. Make sure this definition
     * matches the one in GetRunningTransactionLocks().
     */
    if (lockmode >= AccessExclusiveLock &&
        locktag->locktag_type == LOCKTAG_RELATION &&
        !RecoveryInProgress() &&
        XLogStandbyInfoActive())
    {
        LogAccessExclusiveLockPrepare();
        log_lock = true;
    }

    /*
     * Attempt to take lock via fast path, if eligible.  But if we remember
     * having filled up the fast path array, we don't attempt to make any
     * further use of it until we release some locks.  It's possible that some
     * other backend has transferred some of those locks to the shared hash
     * table, leaving space free, but it's not worth acquiring the LWLock just
     * to check.  It's also possible that we're acquiring a second or third
     * lock type on a relation we have already locked using the fast-path, but
     * for now we don't worry about that case either.
     */
    if (EligibleForRelationFastPath(locktag, lockmode) &&
        FastPathLocalUseCount < FP_LOCK_SLOTS_PER_BACKEND)
    {
        uint32        fasthashcode = FastPathStrongLockHashPartition(hashcode);
        bool        acquired;

        /*
         * LWLockAcquire acts as a memory sequencing point, so it's safe to
         * assume that any strong locker whose increment to
         * FastPathStrongRelationLocks->counts becomes visible after we test
         * it has yet to begin to transfer fast-path locks.
         */
        LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
        if (FastPathStrongRelationLocks->count[fasthashcode] != 0)
            acquired = false;
        else
            acquired = FastPathGrantRelationLock(locktag->locktag_field2,
                                                 lockmode);
        LWLockRelease(&MyProc->backendLock);
        if (acquired)
        {
            /*
             * The locallock might contain stale pointers to some old shared
             * objects; we MUST reset these to null before considering the
             * lock to be acquired via fast-path.
             */
            locallock->lock = NULL;
            locallock->proclock = NULL;
            GrantLockLocal(locallock, owner);
            return LOCKACQUIRE_OK;
        }
    }

    /*
     * If this lock could potentially have been taken via the fast-path by
     * some other backend, we must (temporarily) disable further use of the
     * fast-path for this lock tag, and migrate any locks already taken via
     * this method to the main lock table.
     */
    if (ConflictsWithRelationFastPath(locktag, lockmode))
    {
        uint32        fasthashcode = FastPathStrongLockHashPartition(hashcode);

        BeginStrongLockAcquire(locallock, fasthashcode);
        if (!FastPathTransferRelationLocks(lockMethodTable, locktag,
                                           hashcode))
        {
            AbortStrongLockAcquire();
            if (reportMemoryError)
                ereport(ERROR,
                        (errcode(ERRCODE_OUT_OF_MEMORY),
                         errmsg("out of shared memory"),
                         errhint("You might need to increase max_locks_per_transaction.")));
            else
                return LOCKACQUIRE_NOT_AVAIL;
        }
    }

    /*
     * We didn't find the lock in our LOCALLOCK table, and we didn't manage to
     * take it via the fast-path, either, so we've got to mess with the shared
     * lock table.
     */
    partitionLock = LockHashPartitionLock(hashcode);

    LWLockAcquire(partitionLock, LW_EXCLUSIVE);

    /*
     * Find or create lock and proclock entries with this tag
     *
     * Note: if the locallock object already existed, it might have a pointer
     * to the lock already ... but we should not assume that that pointer is
     * valid, since a lock object with zero hold and request counts can go
     * away anytime.  So we have to use SetupLockInTable() to recompute the
     * lock and proclock pointers, even if they're already set.
     */
    proclock = SetupLockInTable(lockMethodTable, MyProc, locktag,
                                hashcode, lockmode);
    if (!proclock)
    {
        AbortStrongLockAcquire();
        LWLockRelease(partitionLock);
        if (reportMemoryError)
            ereport(ERROR,
                    (errcode(ERRCODE_OUT_OF_MEMORY),
                     errmsg("out of shared memory"),
                     errhint("You might need to increase max_locks_per_transaction.")));
        else
            return LOCKACQUIRE_NOT_AVAIL;
    }
    locallock->proclock = proclock;
    lock = proclock->tag.myLock;
    locallock->lock = lock;

    /*
     * If lock requested conflicts with locks requested by waiters, must join
     * wait queue.  Otherwise, check for conflict with already-held locks.
     * (That's last because most complex check.)
     */
    if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
        status = STATUS_FOUND;
    else
        status = LockCheckConflicts(lockMethodTable, lockmode,
                                    lock, proclock, MyProc);

    if (status == STATUS_OK)
    {
        /* No conflict with held or previously requested locks */
        GrantLock(lock, proclock, lockmode);
        GrantLockLocal(locallock, owner);
    }
    else
    {
        Assert(status == STATUS_FOUND);

        /*
         * We can't acquire the lock immediately.  If caller specified no
         * blocking, remove useless table entries and return NOT_AVAIL without
         * waiting.
         */
        if (dontWait)
        {
            AbortStrongLockAcquire();
            if (proclock->holdMask == 0)
            {
                uint32        proclock_hashcode;

                proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
                SHMQueueDelete(&proclock->lockLink);
                SHMQueueDelete(&proclock->procLink);
                if (!hash_search_with_hash_value(LockMethodProcLockHash,
                                                 (void *) &(proclock->tag),
                                                 proclock_hashcode,
                                                 HASH_REMOVE,
                                                 NULL))
                    elog(PANIC, "proclock table corrupted");
            }
            else
                PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock);
            lock->nRequested--;
            lock->requested[lockmode]--;
            LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
            Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
            Assert(lock->nGranted <= lock->nRequested);
            LWLockRelease(partitionLock);
            if (locallock->nLocks == 0)
                RemoveLocalLock(locallock);
            return LOCKACQUIRE_NOT_AVAIL;
        }

        /*
         * Set bitmask of locks this process already holds on this object.
         */
        MyProc->heldLocks = proclock->holdMask;

        /*
         * Sleep till someone wakes me up.
         */

        TRACE_POSTGRESQL_LOCK_WAIT_START(locktag->locktag_field1,
                                         locktag->locktag_field2,
                                         locktag->locktag_field3,
                                         locktag->locktag_field4,
                                         locktag->locktag_type,
                                         lockmode);

        WaitOnLock(locallock, owner);

        TRACE_POSTGRESQL_LOCK_WAIT_DONE(locktag->locktag_field1,
                                        locktag->locktag_field2,
                                        locktag->locktag_field3,
                                        locktag->locktag_field4,
                                        locktag->locktag_type,
                                        lockmode);

        /*
         * NOTE: do not do any material change of state between here and
         * return.  All required changes in locktable state must have been
         * done when the lock was granted to us --- see notes in WaitOnLock.
         */

        /*
         * Check the proclock entry status, in case something in the ipc
         * communication doesn't work correctly.
         */
        if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
        {
            AbortStrongLockAcquire();
            PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
            LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
            /* Should we retry ? */
            LWLockRelease(partitionLock);
            elog(ERROR, "LockAcquire failed");
        }
        PROCLOCK_PRINT("LockAcquire: granted", proclock);
        LOCK_PRINT("LockAcquire: granted", lock, lockmode);
    }

    /*
     * Lock state is fully up-to-date now; if we error out after this, no
     * special error cleanup is required.
     */
    FinishStrongLockAcquire();

    LWLockRelease(partitionLock);

    /*
     * Emit a WAL record if acquisition of this lock needs to be replayed in a
     * standby server.
     */
    if (log_lock)
    {
        /*
         * Decode the locktag back to the original values, to avoid sending
         * lots of empty bytes with every message.  See lock.h to check how a
         * locktag is defined for LOCKTAG_RELATION
         */
        LogAccessExclusiveLock(locktag->locktag_field1,
                               locktag->locktag_field2);
    }

    return LOCKACQUIRE_OK;
}

/*
 * Find or create LOCK and PROCLOCK objects as needed for a new lock
 * request.
 *
 * Returns the PROCLOCK object, or NULL if we failed to create the objects
 * for lack of shared memory.
 *
 * The appropriate partition lock must be held at entry, and will be
 * held at exit.
 */
static PROCLOCK *
SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
                 const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode)
{// #lizard forgives
    LOCK       *lock;
    PROCLOCK   *proclock;
    PROCLOCKTAG proclocktag;
    uint32        proclock_hashcode;
    bool        found;

    /*
     * Find or create a lock with this tag.
     */
    lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
                                                (const void *) locktag,
                                                hashcode,
                                                HASH_ENTER_NULL,
                                                &found);
    if (!lock)
        return NULL;

    /*
     * if it's a new lock object, initialize it
     */
    if (!found)
    {
        lock->grantMask = 0;
        lock->waitMask = 0;
        SHMQueueInit(&(lock->procLocks));
        ProcQueueInit(&(lock->waitProcs));
        lock->nRequested = 0;
        lock->nGranted = 0;
        MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
        MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
        LOCK_PRINT("LockAcquire: new", lock, lockmode);
    }
    else
    {
        LOCK_PRINT("LockAcquire: found", lock, lockmode);
        Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
        Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
        Assert(lock->nGranted <= lock->nRequested);
    }

    /*
     * Create the hash key for the proclock table.
     */
    proclocktag.myLock = lock;
    proclocktag.myProc = proc;

    proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);

    /*
     * Find or create a proclock entry with this tag
     */
    proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
                                                        (void *) &proclocktag,
                                                        proclock_hashcode,
                                                        HASH_ENTER_NULL,
                                                        &found);
    if (!proclock)
    {
        /* Oops, not enough shmem for the proclock */
        if (lock->nRequested == 0)
        {
            /*
             * There are no other requestors of this lock, so garbage-collect
             * the lock object.  We *must* do this to avoid a permanent leak
             * of shared memory, because there won't be anything to cause
             * anyone to release the lock object later.
             */
            Assert(SHMQueueEmpty(&(lock->procLocks)));
            if (!hash_search_with_hash_value(LockMethodLockHash,
                                             (void *) &(lock->tag),
                                             hashcode,
                                             HASH_REMOVE,
                                             NULL))
                elog(PANIC, "lock table corrupted");
        }
        return NULL;
    }

    /*
     * If new, initialize the new entry
     */
    if (!found)
    {
        uint32        partition = LockHashPartition(hashcode);

        /*
         * It might seem unsafe to access proclock->groupLeader without a
         * lock, but it's not really.  Either we are initializing a proclock
         * on our own behalf, in which case our group leader isn't changing
         * because the group leader for a process can only ever be changed by
         * the process itself; or else we are transferring a fast-path lock to
         * the main lock table, in which case that process can't change it's
         * lock group leader without first releasing all of its locks (and in
         * particular the one we are currently transferring).
         */
        proclock->groupLeader = proc->lockGroupLeader != NULL ?
            proc->lockGroupLeader : proc;
        proclock->holdMask = 0;
        proclock->releaseMask = 0;
        /* Add proclock to appropriate lists */
        SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
        SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
                             &proclock->procLink);
        PROCLOCK_PRINT("LockAcquire: new", proclock);
    }
    else
    {
        PROCLOCK_PRINT("LockAcquire: found", proclock);
        Assert((proclock->holdMask & ~lock->grantMask) == 0);

#ifdef CHECK_DEADLOCK_RISK

        /*
         * Issue warning if we already hold a lower-level lock on this object
         * and do not hold a lock of the requested level or higher. This
         * indicates a deadlock-prone coding practice (eg, we'd have a
         * deadlock if another backend were following the same code path at
         * about the same time).
         *
         * This is not enabled by default, because it may generate log entries
         * about user-level coding practices that are in fact safe in context.
         * It can be enabled to help find system-level problems.
         *
         * XXX Doing numeric comparison on the lockmodes is a hack; it'd be
         * better to use a table.  For now, though, this works.
         */
        {
            int            i;

            for (i = lockMethodTable->numLockModes; i > 0; i--)
            {
                if (proclock->holdMask & LOCKBIT_ON(i))
                {
                    if (i >= (int) lockmode)
                        break;    /* safe: we have a lock >= req level */
                    elog(LOG, "deadlock risk: raising lock level"
                         " from %s to %s on object %u/%u/%u",
                         lockMethodTable->lockModeNames[i],
                         lockMethodTable->lockModeNames[lockmode],
                         lock->tag.locktag_field1, lock->tag.locktag_field2,
                         lock->tag.locktag_field3);
                    break;
                }
            }
        }
#endif                            /* CHECK_DEADLOCK_RISK */
    }

    /*
     * lock->nRequested and lock->requested[] count the total number of
     * requests, whether granted or waiting, so increment those immediately.
     * The other counts don't increment till we get the lock.
     */
    lock->nRequested++;
    lock->requested[lockmode]++;
    Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));

    /*
     * We shouldn't already hold the desired lock; else locallock table is
     * broken.
     */
    if (proclock->holdMask & LOCKBIT_ON(lockmode))
        elog(ERROR, "lock %s on object %u/%u/%u is already held",
             lockMethodTable->lockModeNames[lockmode],
             lock->tag.locktag_field1, lock->tag.locktag_field2,
             lock->tag.locktag_field3);

    return proclock;
}

/*
 * Subroutine to free a locallock entry
 */
static void
RemoveLocalLock(LOCALLOCK *locallock)
{
    int            i;

    for (i = locallock->numLockOwners - 1; i >= 0; i--)
    {
        if (locallock->lockOwners[i].owner != NULL)
            ResourceOwnerForgetLock(locallock->lockOwners[i].owner, locallock);
    }
    locallock->numLockOwners = 0;
    if (locallock->lockOwners != NULL)
        pfree(locallock->lockOwners);
    locallock->lockOwners = NULL;

    if (locallock->holdsStrongLockCount)
    {
        uint32        fasthashcode;

        fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode);

        SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
        Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0);
        FastPathStrongRelationLocks->count[fasthashcode]--;
        locallock->holdsStrongLockCount = FALSE;
        SpinLockRelease(&FastPathStrongRelationLocks->mutex);
    }

    if (!hash_search(LockMethodLocalHash,
                     (void *) &(locallock->tag),
                     HASH_REMOVE, NULL))
        elog(WARNING, "locallock table corrupted");
}

/*
 * LockCheckConflicts -- test whether requested lock conflicts
 *        with those already granted
 *
 * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
 *
 * NOTES:
 *        Here's what makes this complicated: one process's locks don't
 * conflict with one another, no matter what purpose they are held for
 * (eg, session and transaction locks do not conflict).  Nor do the locks
 * of one process in a lock group conflict with those of another process in
 * the same group.  So, we must subtract off these locks when determining
 * whether the requested new lock conflicts with those already held.
 */
int
LockCheckConflicts(LockMethod lockMethodTable,
                   LOCKMODE lockmode,
                   LOCK *lock,
                   PROCLOCK *proclock,
                   PGPROC *proc)
{// #lizard forgives
    int            numLockModes = lockMethodTable->numLockModes;
    LOCKMASK    myLocks;
    LOCKMASK    otherLocks;
    int            conflictMask = lockMethodTable->conflictTab[lockmode];
    int            conflictsRemaining[MAX_LOCKMODES];
    int            totalConflictsRemaining = 0;
    int            i;
    SHM_QUEUE  *procLocks;
    PROCLOCK   *otherproclock;

    /*
     * first check for global conflicts: If no locks conflict with my request,
     * then I get the lock.
     *
     * Checking for conflict: lock->grantMask represents the types of
     * currently held locks.  conflictTable[lockmode] has a bit set for each
     * type of lock that conflicts with request.   Bitwise compare tells if
     * there is a conflict.
     */
    if (!(conflictMask & lock->grantMask))
    {
        PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock);
        return STATUS_OK;
    }

    /*
     * Rats.  Something conflicts.  But it could still be my own lock, or a
     * lock held by another member of my locking group.  First, figure out how
     * many conflicts remain after subtracting out any locks I hold myself.
     */
    myLocks = proclock->holdMask;
    for (i = 1; i <= numLockModes; i++)
    {
        if ((conflictMask & LOCKBIT_ON(i)) == 0)
        {
            conflictsRemaining[i] = 0;
            continue;
        }
        conflictsRemaining[i] = lock->granted[i];
        if (myLocks & LOCKBIT_ON(i))
            --conflictsRemaining[i];
        totalConflictsRemaining += conflictsRemaining[i];
    }

    /* If no conflicts remain, we get the lock. */
    if (totalConflictsRemaining == 0)
    {
        PROCLOCK_PRINT("LockCheckConflicts: resolved (simple)", proclock);
        return STATUS_OK;
    }

#ifdef XCP
    /*
     * So the lock is conflicting with locks held by some other backend.
     * But the backend may belong to the same distributed session. We need to
     * detect such cases and either allow the lock or throw error, because
     * waiting for the lock most probably would cause deadlock.
     */
    LWLockAcquire(ProcArrayLock, LW_SHARED);
    if (proc->coordPid > 0)
    {
        /* Count locks held by this process and friends */
        int myHolding[numLockModes + 1];
        SHM_QUEUE  *procLocks;
        PROCLOCK   *nextplock;

        /* Initialize the counters */
        for (i = 1; i <= numLockModes; i++)
            myHolding[i] = 0;
        otherLocks = 0;

        /* Iterate over processes associated with the lock */
        procLocks = &(lock->procLocks);

        nextplock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
                                              offsetof(PROCLOCK, lockLink));
        while (nextplock)
        {
            PGPROC *nextproc = nextplock->tag.myProc;

            if (nextproc->coordPid == proc->coordPid &&
                    nextproc->coordId == proc->coordId)
            {
                /*
                 * The process belongs to same distributed session, count locks
                 */
                myLocks = nextplock->holdMask;
                for (i = 1; i <= numLockModes; i++)
                    myHolding[i] += ((myLocks & LOCKBIT_ON(i)) ? 1 : 0);
            }
            /* get next proclock */
            nextplock = (PROCLOCK *)
                    SHMQueueNext(procLocks, &nextplock->lockLink,
                                 offsetof(PROCLOCK, lockLink));
        }

        /* Summarize locks held by other processes */
        for (i = 1; i <= numLockModes; i++)
        {
            if (lock->granted[i] > myHolding[i])
                otherLocks |= LOCKBIT_ON(i);
        }

        /*
         * Yet another check.
         */
        if (!(lockMethodTable->conflictTab[lockmode] & otherLocks))
        {
            LWLockRelease(ProcArrayLock);
            /* no conflict. OK to get the lock */
            PROCLOCK_PRINT("LockCheckConflicts: resolved as held by friend",
                           proclock);
#ifdef LOCK_DEBUG
            elog(LOG, "Allow lock as held by the same distributed session [%u,%u] %s",
                 lock->tag.locktag_field1, lock->tag.locktag_field2,
                 lockMethodTable->lockModeNames[lockmode]);
#endif
            return STATUS_OK;
        }
    }
    LWLockRelease(ProcArrayLock);
#endif

    /* If no group locking, it's definitely a conflict. */
    if (proclock->groupLeader == MyProc && MyProc->lockGroupLeader == NULL)
    {
        Assert(proclock->tag.myProc == MyProc);
        PROCLOCK_PRINT("LockCheckConflicts: conflicting (simple)",
                       proclock);
        return STATUS_FOUND;
    }

    /*
     * Locks held in conflicting modes by members of our own lock group are
     * not real conflicts; we can subtract those out and see if we still have
     * a conflict.  This is O(N) in the number of processes holding or
     * awaiting locks on this object.  We could improve that by making the
     * shared memory state more complex (and larger) but it doesn't seem worth
     * it.
     */
    procLocks = &(lock->procLocks);
    otherproclock = (PROCLOCK *)
        SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink));
    while (otherproclock != NULL)
    {
        if (proclock != otherproclock &&
            proclock->groupLeader == otherproclock->groupLeader &&
            (otherproclock->holdMask & conflictMask) != 0)
        {
            int            intersectMask = otherproclock->holdMask & conflictMask;

            for (i = 1; i <= numLockModes; i++)
            {
                if ((intersectMask & LOCKBIT_ON(i)) != 0)
                {
                    if (conflictsRemaining[i] <= 0)
                        elog(PANIC, "proclocks held do not match lock");
                    conflictsRemaining[i]--;
                    totalConflictsRemaining--;
                }
            }

            if (totalConflictsRemaining == 0)
            {
                PROCLOCK_PRINT("LockCheckConflicts: resolved (group)",
                               proclock);
                return STATUS_OK;
            }
        }
        otherproclock = (PROCLOCK *)
            SHMQueueNext(procLocks, &otherproclock->lockLink,
                         offsetof(PROCLOCK, lockLink));
    }


    /* Nope, it's a real conflict. */
    PROCLOCK_PRINT("LockCheckConflicts: conflicting (group)", proclock);
    return STATUS_FOUND;
}

/*
 * GrantLock -- update the lock and proclock data structures to show
 *        the lock request has been granted.
 *
 * NOTE: if proc was blocked, it also needs to be removed from the wait list
 * and have its waitLock/waitProcLock fields cleared.  That's not done here.
 *
 * NOTE: the lock grant also has to be recorded in the associated LOCALLOCK
 * table entry; but since we may be awaking some other process, we can't do
 * that here; it's done by GrantLockLocal, instead.
 */
void
GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
{
    lock->nGranted++;
    lock->granted[lockmode]++;
    lock->grantMask |= LOCKBIT_ON(lockmode);
    if (lock->granted[lockmode] == lock->requested[lockmode])
        lock->waitMask &= LOCKBIT_OFF(lockmode);
    proclock->holdMask |= LOCKBIT_ON(lockmode);
    LOCK_PRINT("GrantLock", lock, lockmode);
    Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
    Assert(lock->nGranted <= lock->nRequested);
}

/*
 * UnGrantLock -- opposite of GrantLock.
 *
 * Updates the lock and proclock data structures to show that the lock
 * is no longer held nor requested by the current holder.
 *
 * Returns true if there were any waiters waiting on the lock that
 * should now be woken up with ProcLockWakeup.
 */
static bool
UnGrantLock(LOCK *lock, LOCKMODE lockmode,
            PROCLOCK *proclock, LockMethod lockMethodTable)
{
    bool        wakeupNeeded = false;

    Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
    Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
    Assert(lock->nGranted <= lock->nRequested);

    /*
     * fix the general lock stats
     */
    lock->nRequested--;
    lock->requested[lockmode]--;
    lock->nGranted--;
    lock->granted[lockmode]--;

    if (lock->granted[lockmode] == 0)
    {
        /* change the conflict mask.  No more of this lock type. */
        lock->grantMask &= LOCKBIT_OFF(lockmode);
    }

    LOCK_PRINT("UnGrantLock: updated", lock, lockmode);

    /*
     * We need only run ProcLockWakeup if the released lock conflicts with at
     * least one of the lock types requested by waiter(s).  Otherwise whatever
     * conflict made them wait must still exist.  NOTE: before MVCC, we could
     * skip wakeup if lock->granted[lockmode] was still positive. But that's
     * not true anymore, because the remaining granted locks might belong to
     * some waiter, who could now be awakened because he doesn't conflict with
     * his own locks.
     */
    if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
        wakeupNeeded = true;

    /*
     * Now fix the per-proclock state.
     */
    proclock->holdMask &= LOCKBIT_OFF(lockmode);
    PROCLOCK_PRINT("UnGrantLock: updated", proclock);

    return wakeupNeeded;
}

/*
 * CleanUpLock -- clean up after releasing a lock.  We garbage-collect the
 * proclock and lock objects if possible, and call ProcLockWakeup if there
 * are remaining requests and the caller says it's OK.  (Normally, this
 * should be called after UnGrantLock, and wakeupNeeded is the result from
 * UnGrantLock.)
 *
 * The appropriate partition lock must be held at entry, and will be
 * held at exit.
 */
static void
CleanUpLock(LOCK *lock, PROCLOCK *proclock,
            LockMethod lockMethodTable, uint32 hashcode,
            bool wakeupNeeded)
{
    /*
     * If this was my last hold on this lock, delete my entry in the proclock
     * table.
     */
    if (proclock->holdMask == 0)
    {
        uint32        proclock_hashcode;

        PROCLOCK_PRINT("CleanUpLock: deleting", proclock);
        SHMQueueDelete(&proclock->lockLink);
        SHMQueueDelete(&proclock->procLink);
        proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
        if (!hash_search_with_hash_value(LockMethodProcLockHash,
                                         (void *) &(proclock->tag),
                                         proclock_hashcode,
                                         HASH_REMOVE,
                                         NULL))
            elog(PANIC, "proclock table corrupted");
    }

    if (lock->nRequested == 0)
    {
        /*
         * The caller just released the last lock, so garbage-collect the lock
         * object.
         */
        LOCK_PRINT("CleanUpLock: deleting", lock, 0);
        Assert(SHMQueueEmpty(&(lock->procLocks)));
        if (!hash_search_with_hash_value(LockMethodLockHash,
                                         (void *) &(lock->tag),
                                         hashcode,
                                         HASH_REMOVE,
                                         NULL))
            elog(PANIC, "lock table corrupted");
    }
    else if (wakeupNeeded)
    {
        /* There are waiters on this lock, so wake them up. */
        ProcLockWakeup(lockMethodTable, lock);
    }
}

/*
 * GrantLockLocal -- update the locallock data structures to show
 *        the lock request has been granted.
 *
 * We expect that LockAcquire made sure there is room to add a new
 * ResourceOwner entry.
 */
static void
GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
{
    LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
    int            i;

    Assert(locallock->numLockOwners < locallock->maxLockOwners);
    /* Count the total */
    locallock->nLocks++;
    /* Count the per-owner lock */
    for (i = 0; i < locallock->numLockOwners; i++)
    {
        if (lockOwners[i].owner == owner)
        {
            lockOwners[i].nLocks++;
            return;
        }
    }
    lockOwners[i].owner = owner;
    lockOwners[i].nLocks = 1;
    locallock->numLockOwners++;
    if (owner != NULL)
        ResourceOwnerRememberLock(owner, locallock);
}

/*
 * BeginStrongLockAcquire - inhibit use of fastpath for a given LOCALLOCK,
 * and arrange for error cleanup if it fails
 */
static void
BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode)
{
    Assert(StrongLockInProgress == NULL);
    Assert(locallock->holdsStrongLockCount == FALSE);

    /*
     * Adding to a memory location is not atomic, so we take a spinlock to
     * ensure we don't collide with someone else trying to bump the count at
     * the same time.
     *
     * XXX: It might be worth considering using an atomic fetch-and-add
     * instruction here, on architectures where that is supported.
     */

    SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
    FastPathStrongRelationLocks->count[fasthashcode]++;
    locallock->holdsStrongLockCount = TRUE;
    StrongLockInProgress = locallock;
    SpinLockRelease(&FastPathStrongRelationLocks->mutex);
}

/*
 * FinishStrongLockAcquire - cancel pending cleanup for a strong lock
 * acquisition once it's no longer needed
 */
static void
FinishStrongLockAcquire(void)
{
    StrongLockInProgress = NULL;
}

/*
 * AbortStrongLockAcquire - undo strong lock state changes performed by
 * BeginStrongLockAcquire.
 */
void
AbortStrongLockAcquire(void)
{
    uint32        fasthashcode;
    LOCALLOCK  *locallock = StrongLockInProgress;

    if (locallock == NULL)
        return;

    fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode);
    Assert(locallock->holdsStrongLockCount == TRUE);
    SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
    Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0);
    FastPathStrongRelationLocks->count[fasthashcode]--;
    locallock->holdsStrongLockCount = FALSE;
    StrongLockInProgress = NULL;
    SpinLockRelease(&FastPathStrongRelationLocks->mutex);
}

/*
 * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing
 *        WaitOnLock on.
 *
 * proc.c needs this for the case where we are booted off the lock by
 * timeout, but discover that someone granted us the lock anyway.
 *
 * We could just export GrantLockLocal, but that would require including
 * resowner.h in lock.h, which creates circularity.
 */
void
GrantAwaitedLock(void)
{
    GrantLockLocal(awaitedLock, awaitedOwner);
}

/*
 * WaitOnLock -- wait to acquire a lock
 *
 * Caller must have set MyProc->heldLocks to reflect locks already held
 * on the lockable object by this process.
 *
 * The appropriate partition lock must be held at entry.
 */
static void
WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner)
{
    LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock);
    LockMethod    lockMethodTable = LockMethods[lockmethodid];
    char       *volatile new_status = NULL;

    LOCK_PRINT("WaitOnLock: sleeping on lock",
               locallock->lock, locallock->tag.mode);

    /* Report change to waiting status */
    if (update_process_title)
    {
        const char *old_status;
        int            len;

        old_status = get_ps_display(&len);
        new_status = (char *) palloc(len + 8 + 1);
        memcpy(new_status, old_status, len);
        strcpy(new_status + len, " waiting");
        set_ps_display(new_status, false);
        new_status[len] = '\0'; /* truncate off " waiting" */
    }

    awaitedLock = locallock;
    awaitedOwner = owner;

    /*
     * NOTE: Think not to put any shared-state cleanup after the call to
     * ProcSleep, in either the normal or failure path.  The lock state must
     * be fully set by the lock grantor, or by CheckDeadLock if we give up
     * waiting for the lock.  This is necessary because of the possibility
     * that a cancel/die interrupt will interrupt ProcSleep after someone else
     * grants us the lock, but before we've noticed it. Hence, after granting,
     * the locktable state must fully reflect the fact that we own the lock;
     * we can't do additional work on return.
     *
     * We can and do use a PG_TRY block to try to clean up after failure, but
     * this still has a major limitation: elog(FATAL) can occur while waiting
     * (eg, a "die" interrupt), and then control won't come back here. So all
     * cleanup of essential state should happen in LockErrorCleanup, not here.
     * We can use PG_TRY to clear the "waiting" status flags, since doing that
     * is unimportant if the process exits.
     */
    PG_TRY();
    {
        if (ProcSleep(locallock, lockMethodTable) != STATUS_OK)
        {
            /*
             * We failed as a result of a deadlock, see CheckDeadLock(). Quit
             * now.
             */
            awaitedLock = NULL;
            LOCK_PRINT("WaitOnLock: aborting on lock",
                       locallock->lock, locallock->tag.mode);
            LWLockRelease(LockHashPartitionLock(locallock->hashcode));

            /*
             * Now that we aren't holding the partition lock, we can give an
             * error report including details about the detected deadlock.
             */
            DeadLockReport();
            /* not reached */
        }
    }
    PG_CATCH();
    {
        /* In this path, awaitedLock remains set until LockErrorCleanup */

        /* Report change to non-waiting status */
        if (update_process_title)
        {
            set_ps_display(new_status, false);
            pfree(new_status);
        }

        /* and propagate the error */
        PG_RE_THROW();
    }
    PG_END_TRY();

    awaitedLock = NULL;

    /* Report change to non-waiting status */
    if (update_process_title)
    {
        set_ps_display(new_status, false);
        pfree(new_status);
    }

    LOCK_PRINT("WaitOnLock: wakeup on lock",
               locallock->lock, locallock->tag.mode);
}

/*
 * Remove a proc from the wait-queue it is on (caller must know it is on one).
 * This is only used when the proc has failed to get the lock, so we set its
 * waitStatus to STATUS_ERROR.
 *
 * Appropriate partition lock must be held by caller.  Also, caller is
 * responsible for signaling the proc if needed.
 *
 * NB: this does not clean up any locallock object that may exist for the lock.
 */
void
RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode)
{
    LOCK       *waitLock = proc->waitLock;
    PROCLOCK   *proclock = proc->waitProcLock;
    LOCKMODE    lockmode = proc->waitLockMode;
    LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*waitLock);

    /* Make sure proc is waiting */
    Assert(proc->waitStatus == STATUS_WAITING);
    Assert(proc->links.next != NULL);
    Assert(waitLock);
    Assert(waitLock->waitProcs.size > 0);
    Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));

    /* Remove proc from lock's wait queue */
    SHMQueueDelete(&(proc->links));
    waitLock->waitProcs.size--;

    /* Undo increments of request counts by waiting process */
    Assert(waitLock->nRequested > 0);
    Assert(waitLock->nRequested > proc->waitLock->nGranted);
    waitLock->nRequested--;
    Assert(waitLock->requested[lockmode] > 0);
    waitLock->requested[lockmode]--;
    /* don't forget to clear waitMask bit if appropriate */
    if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
        waitLock->waitMask &= LOCKBIT_OFF(lockmode);

    /* Clean up the proc's own state, and pass it the ok/fail signal */
    proc->waitLock = NULL;
    proc->waitProcLock = NULL;
    proc->waitStatus = STATUS_ERROR;

    /*
     * Delete the proclock immediately if it represents no already-held locks.
     * (This must happen now because if the owner of the lock decides to
     * release it, and the requested/granted counts then go to zero,
     * LockRelease expects there to be no remaining proclocks.) Then see if
     * any other waiters for the lock can be woken up now.
     */
    CleanUpLock(waitLock, proclock,
                LockMethods[lockmethodid], hashcode,
                true);
}

/*
 * LockRelease -- look up 'locktag' and release one 'lockmode' lock on it.
 *        Release a session lock if 'sessionLock' is true, else release a
 *        regular transaction lock.
 *
 * Side Effects: find any waiting processes that are now wakable,
 *        grant them their requested locks and awaken them.
 *        (We have to grant the lock here to avoid a race between
 *        the waking process and any new process to
 *        come along and request the lock.)
 */
bool
LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
{// #lizard forgives
    LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
    LockMethod    lockMethodTable;
    LOCALLOCKTAG localtag;
    LOCALLOCK  *locallock;
    LOCK       *lock;
    PROCLOCK   *proclock;
    LWLock       *partitionLock;
    bool        wakeupNeeded;

    if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
        elog(ERROR, "unrecognized lock method: %d", lockmethodid);
    lockMethodTable = LockMethods[lockmethodid];
    if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
        elog(ERROR, "unrecognized lock mode: %d", lockmode);

#ifdef LOCK_DEBUG
    if (LOCK_DEBUG_ENABLED(locktag))
        elog(LOG, "LockRelease: lock [%u,%u] %s",
             locktag->locktag_field1, locktag->locktag_field2,
             lockMethodTable->lockModeNames[lockmode]);
#endif

    /*
     * Find the LOCALLOCK entry for this lock and lockmode
     */
    MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
    localtag.lock = *locktag;
    localtag.mode = lockmode;

    locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
                                          (void *) &localtag,
                                          HASH_FIND, NULL);

    /*
     * let the caller print its own error message, too. Do not ereport(ERROR).
     */
    if (!locallock || locallock->nLocks <= 0)
    {
        elog(WARNING, "you don't own a lock of type %s",
             lockMethodTable->lockModeNames[lockmode]);
        return FALSE;
    }

    /*
     * Decrease the count for the resource owner.
     */
    {
        LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
        ResourceOwner owner;
        int            i;

        /* Identify owner for lock */
        if (sessionLock)
            owner = NULL;
        else
            owner = CurrentResourceOwner;

        for (i = locallock->numLockOwners - 1; i >= 0; i--)
        {
            if (lockOwners[i].owner == owner)
            {
                Assert(lockOwners[i].nLocks > 0);
                if (--lockOwners[i].nLocks == 0)
                {
                    if (owner != NULL)
                        ResourceOwnerForgetLock(owner, locallock);
                    /* compact out unused slot */
                    locallock->numLockOwners--;
                    if (i < locallock->numLockOwners)
                        lockOwners[i] = lockOwners[locallock->numLockOwners];
                }
                break;
            }
        }
        if (i < 0)
        {
            /* don't release a lock belonging to another owner */
            elog(WARNING, "you don't own a lock of type %s",
                 lockMethodTable->lockModeNames[lockmode]);
            return FALSE;
        }
    }

    /*
     * Decrease the total local count.  If we're still holding the lock, we're
     * done.
     */
    locallock->nLocks--;

    if (locallock->nLocks > 0)
        return TRUE;

    /* Attempt fast release of any lock eligible for the fast path. */
    if (EligibleForRelationFastPath(locktag, lockmode) &&
        FastPathLocalUseCount > 0)
    {
        bool        released;

        /*
         * We might not find the lock here, even if we originally entered it
         * here.  Another backend may have moved it to the main table.
         */
        LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
        released = FastPathUnGrantRelationLock(locktag->locktag_field2,
                                               lockmode);
        LWLockRelease(&MyProc->backendLock);
        if (released)
        {
            RemoveLocalLock(locallock);
            return TRUE;
        }
    }

    /*
     * Otherwise we've got to mess with the shared lock table.
     */
    partitionLock = LockHashPartitionLock(locallock->hashcode);

    LWLockAcquire(partitionLock, LW_EXCLUSIVE);

    /*
     * Normally, we don't need to re-find the lock or proclock, since we kept
     * their addresses in the locallock table, and they couldn't have been
     * removed while we were holding a lock on them.  But it's possible that
     * the lock was taken fast-path and has since been moved to the main hash
     * table by another backend, in which case we will need to look up the
     * objects here.  We assume the lock field is NULL if so.
     */
    lock = locallock->lock;
    if (!lock)
    {
        PROCLOCKTAG proclocktag;

        Assert(EligibleForRelationFastPath(locktag, lockmode));
        lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
                                                    (const void *) locktag,
                                                    locallock->hashcode,
                                                    HASH_FIND,
                                                    NULL);
        if (!lock)
            elog(ERROR, "failed to re-find shared lock object");
        locallock->lock = lock;

        proclocktag.myLock = lock;
        proclocktag.myProc = MyProc;
        locallock->proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash,
                                                       (void *) &proclocktag,
                                                       HASH_FIND,
                                                       NULL);
        if (!locallock->proclock)
            elog(ERROR, "failed to re-find shared proclock object");
    }
    LOCK_PRINT("LockRelease: found", lock, lockmode);
    proclock = locallock->proclock;
    PROCLOCK_PRINT("LockRelease: found", proclock);

    /*
     * Double-check that we are actually holding a lock of the type we want to
     * release.
     */
    if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
    {
        PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock);
        LWLockRelease(partitionLock);
        elog(WARNING, "you don't own a lock of type %s",
             lockMethodTable->lockModeNames[lockmode]);
        RemoveLocalLock(locallock);
        return FALSE;
    }

    /*
     * Do the releasing.  CleanUpLock will waken any now-wakable waiters.
     */
    wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);

    CleanUpLock(lock, proclock,
                lockMethodTable, locallock->hashcode,
                wakeupNeeded);

    LWLockRelease(partitionLock);

    RemoveLocalLock(locallock);
    return TRUE;
}

/*
 * LockReleaseAll -- Release all locks of the specified lock method that
 *        are held by the current process.
 *
 * Well, not necessarily *all* locks.  The available behaviors are:
 *        allLocks == true: release all locks including session locks.
 *        allLocks == false: release all non-session locks.
 */
void
LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
{// #lizard forgives
    HASH_SEQ_STATUS status;
    LockMethod    lockMethodTable;
    int            i,
                numLockModes;
    LOCALLOCK  *locallock;
    LOCK       *lock;
    PROCLOCK   *proclock;
    int            partition;
    bool        have_fast_path_lwlock = false;

    if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
        elog(ERROR, "unrecognized lock method: %d", lockmethodid);
    lockMethodTable = LockMethods[lockmethodid];

#ifdef LOCK_DEBUG
    if (*(lockMethodTable->trace_flag))
        elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid);
#endif

    /*
     * Get rid of our fast-path VXID lock, if appropriate.  Note that this is
     * the only way that the lock we hold on our own VXID can ever get
     * released: it is always and only released when a toplevel transaction
     * ends.
     */
    if (lockmethodid == DEFAULT_LOCKMETHOD)
        VirtualXactLockTableCleanup();

    numLockModes = lockMethodTable->numLockModes;

    /*
     * First we run through the locallock table and get rid of unwanted
     * entries, then we scan the process's proclocks and get rid of those. We
     * do this separately because we may have multiple locallock entries
     * pointing to the same proclock, and we daren't end up with any dangling
     * pointers.  Fast-path locks are cleaned up during the locallock table
     * scan, though.
     */
    hash_seq_init(&status, LockMethodLocalHash);

    while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
    {
        /*
         * If the LOCALLOCK entry is unused, we must've run out of shared
         * memory while trying to set up this lock.  Just forget the local
         * entry.
         */
        if (locallock->nLocks == 0)
        {
            RemoveLocalLock(locallock);
            continue;
        }

        /* Ignore items that are not of the lockmethod to be removed */
        if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
            continue;

        /*
         * If we are asked to release all locks, we can just zap the entry.
         * Otherwise, must scan to see if there are session locks. We assume
         * there is at most one lockOwners entry for session locks.
         */
        if (!allLocks)
        {
            LOCALLOCKOWNER *lockOwners = locallock->lockOwners;

            /* If session lock is above array position 0, move it down to 0 */
            for (i = 0; i < locallock->numLockOwners; i++)
            {
                if (lockOwners[i].owner == NULL)
                    lockOwners[0] = lockOwners[i];
                else
                    ResourceOwnerForgetLock(lockOwners[i].owner, locallock);
            }

            if (locallock->numLockOwners > 0 &&
                lockOwners[0].owner == NULL &&
                lockOwners[0].nLocks > 0)
            {
                /* Fix the locallock to show just the session locks */
                locallock->nLocks = lockOwners[0].nLocks;
                locallock->numLockOwners = 1;
                /* We aren't deleting this locallock, so done */
                continue;
            }
            else
                locallock->numLockOwners = 0;
        }

        /*
         * If the lock or proclock pointers are NULL, this lock was taken via
         * the relation fast-path (and is not known to have been transferred).
         */
        if (locallock->proclock == NULL || locallock->lock == NULL)
        {
            LOCKMODE    lockmode = locallock->tag.mode;
            Oid            relid;

            /* Verify that a fast-path lock is what we've got. */
            if (!EligibleForRelationFastPath(&locallock->tag.lock, lockmode))
                elog(PANIC, "locallock table corrupted");

            /*
             * If we don't currently hold the LWLock that protects our
             * fast-path data structures, we must acquire it before attempting
             * to release the lock via the fast-path.  We will continue to
             * hold the LWLock until we're done scanning the locallock table,
             * unless we hit a transferred fast-path lock.  (XXX is this
             * really such a good idea?  There could be a lot of entries ...)
             */
            if (!have_fast_path_lwlock)
            {
                LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
                have_fast_path_lwlock = true;
            }

            /* Attempt fast-path release. */
            relid = locallock->tag.lock.locktag_field2;
            if (FastPathUnGrantRelationLock(relid, lockmode))
            {
                RemoveLocalLock(locallock);
                continue;
            }

            /*
             * Our lock, originally taken via the fast path, has been
             * transferred to the main lock table.  That's going to require
             * some extra work, so release our fast-path lock before starting.
             */
            LWLockRelease(&MyProc->backendLock);
            have_fast_path_lwlock = false;

            /*
             * Now dump the lock.  We haven't got a pointer to the LOCK or
             * PROCLOCK in this case, so we have to handle this a bit
             * differently than a normal lock release.  Unfortunately, this
             * requires an extra LWLock acquire-and-release cycle on the
             * partitionLock, but hopefully it shouldn't happen often.
             */
            LockRefindAndRelease(lockMethodTable, MyProc,
                                 &locallock->tag.lock, lockmode, false);
            RemoveLocalLock(locallock);
            continue;
        }

        /* Mark the proclock to show we need to release this lockmode */
        if (locallock->nLocks > 0)
            locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);

        /* And remove the locallock hashtable entry */
        RemoveLocalLock(locallock);
    }

    /* Done with the fast-path data structures */
    if (have_fast_path_lwlock)
        LWLockRelease(&MyProc->backendLock);

    /*
     * Now, scan each lock partition separately.
     */
    for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
    {
        LWLock       *partitionLock;
        SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
        PROCLOCK   *nextplock;

        partitionLock = LockHashPartitionLockByIndex(partition);

        /*
         * If the proclock list for this partition is empty, we can skip
         * acquiring the partition lock.  This optimization is trickier than
         * it looks, because another backend could be in process of adding
         * something to our proclock list due to promoting one of our
         * fast-path locks.  However, any such lock must be one that we
         * decided not to delete above, so it's okay to skip it again now;
         * we'd just decide not to delete it again.  We must, however, be
         * careful to re-fetch the list header once we've acquired the
         * partition lock, to be sure we have a valid, up-to-date pointer.
         * (There is probably no significant risk if pointer fetch/store is
         * atomic, but we don't wish to assume that.)
         *
         * XXX This argument assumes that the locallock table correctly
         * represents all of our fast-path locks.  While allLocks mode
         * guarantees to clean up all of our normal locks regardless of the
         * locallock situation, we lose that guarantee for fast-path locks.
         * This is not ideal.
         */
        if (SHMQueueNext(procLocks, procLocks,
                         offsetof(PROCLOCK, procLink)) == NULL)
            continue;            /* needn't examine this partition */

        LWLockAcquire(partitionLock, LW_EXCLUSIVE);

        for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
                                                  offsetof(PROCLOCK, procLink));
             proclock;
             proclock = nextplock)
        {
            bool        wakeupNeeded = false;

            /* Get link first, since we may unlink/delete this proclock */
            nextplock = (PROCLOCK *)
                SHMQueueNext(procLocks, &proclock->procLink,
                             offsetof(PROCLOCK, procLink));

            Assert(proclock->tag.myProc == MyProc);

            lock = proclock->tag.myLock;

            /* Ignore items that are not of the lockmethod to be removed */
            if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
                continue;

            /*
             * In allLocks mode, force release of all locks even if locallock
             * table had problems
             */
            if (allLocks)
                proclock->releaseMask = proclock->holdMask;
            else
                Assert((proclock->releaseMask & ~proclock->holdMask) == 0);

            /*
             * Ignore items that have nothing to be released, unless they have
             * holdMask == 0 and are therefore recyclable
             */
            if (proclock->releaseMask == 0 && proclock->holdMask != 0)
                continue;

            PROCLOCK_PRINT("LockReleaseAll", proclock);
            LOCK_PRINT("LockReleaseAll", lock, 0);
            Assert(lock->nRequested >= 0);
            Assert(lock->nGranted >= 0);
            Assert(lock->nGranted <= lock->nRequested);
            Assert((proclock->holdMask & ~lock->grantMask) == 0);

            /*
             * Release the previously-marked lock modes
             */
            for (i = 1; i <= numLockModes; i++)
            {
                if (proclock->releaseMask & LOCKBIT_ON(i))
                    wakeupNeeded |= UnGrantLock(lock, i, proclock,
                                                lockMethodTable);
            }
            Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
            Assert(lock->nGranted <= lock->nRequested);
            LOCK_PRINT("LockReleaseAll: updated", lock, 0);

            proclock->releaseMask = 0;

            /* CleanUpLock will wake up waiters if needed. */
            CleanUpLock(lock, proclock,
                        lockMethodTable,
                        LockTagHashCode(&lock->tag),
                        wakeupNeeded);
        }                        /* loop over PROCLOCKs within this partition */

        LWLockRelease(partitionLock);
    }                            /* loop over partitions */

#ifdef LOCK_DEBUG
    if (*(lockMethodTable->trace_flag))
        elog(LOG, "LockReleaseAll done");
#endif
}

/*
 * LockReleaseSession -- Release all session locks of the specified lock method
 *        that are held by the current process.
 */
void
LockReleaseSession(LOCKMETHODID lockmethodid)
{
    HASH_SEQ_STATUS status;
    LOCALLOCK  *locallock;

    if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
        elog(ERROR, "unrecognized lock method: %d", lockmethodid);

    hash_seq_init(&status, LockMethodLocalHash);

    while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
    {
        /* Ignore items that are not of the specified lock method */
        if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
            continue;

        ReleaseLockIfHeld(locallock, true);
    }
}

/*
 * LockReleaseCurrentOwner
 *        Release all locks belonging to CurrentResourceOwner
 *
 * If the caller knows what those locks are, it can pass them as an array.
 * That speeds up the call significantly, when a lot of locks are held.
 * Otherwise, pass NULL for locallocks, and we'll traverse through our hash
 * table to find them.
 */
void
LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks)
{
    if (locallocks == NULL)
    {
        HASH_SEQ_STATUS status;
        LOCALLOCK  *locallock;

        hash_seq_init(&status, LockMethodLocalHash);

        while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
            ReleaseLockIfHeld(locallock, false);
    }
    else
    {
        int            i;

        for (i = nlocks - 1; i >= 0; i--)
            ReleaseLockIfHeld(locallocks[i], false);
    }
}

/*
 * ReleaseLockIfHeld
 *        Release any session-level locks on this lockable object if sessionLock
 *        is true; else, release any locks held by CurrentResourceOwner.
 *
 * It is tempting to pass this a ResourceOwner pointer (or NULL for session
 * locks), but without refactoring LockRelease() we cannot support releasing
 * locks belonging to resource owners other than CurrentResourceOwner.
 * If we were to refactor, it'd be a good idea to fix it so we don't have to
 * do a hashtable lookup of the locallock, too.  However, currently this
 * function isn't used heavily enough to justify refactoring for its
 * convenience.
 */
static void
ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock)
{
    ResourceOwner owner;
    LOCALLOCKOWNER *lockOwners;
    int            i;

    /* Identify owner for lock (must match LockRelease!) */
    if (sessionLock)
        owner = NULL;
    else
        owner = CurrentResourceOwner;

    /* Scan to see if there are any locks belonging to the target owner */
    lockOwners = locallock->lockOwners;
    for (i = locallock->numLockOwners - 1; i >= 0; i--)
    {
        if (lockOwners[i].owner == owner)
        {
            Assert(lockOwners[i].nLocks > 0);
            if (lockOwners[i].nLocks < locallock->nLocks)
            {
                /*
                 * We will still hold this lock after forgetting this
                 * ResourceOwner.
                 */
                locallock->nLocks -= lockOwners[i].nLocks;
                /* compact out unused slot */
                locallock->numLockOwners--;
                if (owner != NULL)
                    ResourceOwnerForgetLock(owner, locallock);
                if (i < locallock->numLockOwners)
                    lockOwners[i] = lockOwners[locallock->numLockOwners];
            }
            else
            {
                Assert(lockOwners[i].nLocks == locallock->nLocks);
                /* We want to call LockRelease just once */
                lockOwners[i].nLocks = 1;
                locallock->nLocks = 1;
                if (!LockRelease(&locallock->tag.lock,
                                 locallock->tag.mode,
                                 sessionLock))
                    elog(WARNING, "ReleaseLockIfHeld: failed??");
            }
            break;
        }
    }
}

/*
 * LockReassignCurrentOwner
 *        Reassign all locks belonging to CurrentResourceOwner to belong
 *        to its parent resource owner.
 *
 * If the caller knows what those locks are, it can pass them as an array.
 * That speeds up the call significantly, when a lot of locks are held
 * (e.g pg_dump with a large schema).  Otherwise, pass NULL for locallocks,
 * and we'll traverse through our hash table to find them.
 */
void
LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks)
{
    ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);

    Assert(parent != NULL);

    if (locallocks == NULL)
    {
        HASH_SEQ_STATUS status;
        LOCALLOCK  *locallock;

        hash_seq_init(&status, LockMethodLocalHash);

        while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
            LockReassignOwner(locallock, parent);
    }
    else
    {
        int            i;

        for (i = nlocks - 1; i >= 0; i--)
            LockReassignOwner(locallocks[i], parent);
    }
}

/*
 * Subroutine of LockReassignCurrentOwner. Reassigns a given lock belonging to
 * CurrentResourceOwner to its parent.
 */
static void
LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent)
{
    LOCALLOCKOWNER *lockOwners;
    int            i;
    int            ic = -1;
    int            ip = -1;

    /*
     * Scan to see if there are any locks belonging to current owner or its
     * parent
     */
    lockOwners = locallock->lockOwners;
    for (i = locallock->numLockOwners - 1; i >= 0; i--)
    {
        if (lockOwners[i].owner == CurrentResourceOwner)
            ic = i;
        else if (lockOwners[i].owner == parent)
            ip = i;
    }

    if (ic < 0)
        return;                    /* no current locks */

    if (ip < 0)
    {
        /* Parent has no slot, so just give it the child's slot */
        lockOwners[ic].owner = parent;
        ResourceOwnerRememberLock(parent, locallock);
    }
    else
    {
        /* Merge child's count with parent's */
        lockOwners[ip].nLocks += lockOwners[ic].nLocks;
        /* compact out unused slot */
        locallock->numLockOwners--;
        if (ic < locallock->numLockOwners)
            lockOwners[ic] = lockOwners[locallock->numLockOwners];
    }
    ResourceOwnerForgetLock(CurrentResourceOwner, locallock);
}

/*
 * FastPathGrantRelationLock
 *        Grant lock using per-backend fast-path array, if there is space.
 */
static bool
FastPathGrantRelationLock(Oid relid, LOCKMODE lockmode)
{
    uint32        f;
    uint32        unused_slot = FP_LOCK_SLOTS_PER_BACKEND;

    /* Scan for existing entry for this relid, remembering empty slot. */
    for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
    {
        if (FAST_PATH_GET_BITS(MyProc, f) == 0)
            unused_slot = f;
        else if (MyProc->fpRelId[f] == relid)
        {
            Assert(!FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode));
            FAST_PATH_SET_LOCKMODE(MyProc, f, lockmode);
            return true;
        }
    }

    /* If no existing entry, use any empty slot. */
    if (unused_slot < FP_LOCK_SLOTS_PER_BACKEND)
    {
        MyProc->fpRelId[unused_slot] = relid;
        FAST_PATH_SET_LOCKMODE(MyProc, unused_slot, lockmode);
        ++FastPathLocalUseCount;
        return true;
    }

    /* No existing entry, and no empty slot. */
    return false;
}

/*
 * FastPathUnGrantRelationLock
 *        Release fast-path lock, if present.  Update backend-private local
 *        use count, while we're at it.
 */
static bool
FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode)
{
    uint32        f;
    bool        result = false;

    FastPathLocalUseCount = 0;
    for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
    {
        if (MyProc->fpRelId[f] == relid
            && FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode))
        {
            Assert(!result);
            FAST_PATH_CLEAR_LOCKMODE(MyProc, f, lockmode);
            result = true;
            /* we continue iterating so as to update FastPathLocalUseCount */
        }
        if (FAST_PATH_GET_BITS(MyProc, f) != 0)
            ++FastPathLocalUseCount;
    }
    return result;
}

/*
 * FastPathTransferRelationLocks
 *        Transfer locks matching the given lock tag from per-backend fast-path
 *        arrays to the shared hash table.
 *
 * Returns true if successful, false if ran out of shared memory.
 */
static bool
FastPathTransferRelationLocks(LockMethod lockMethodTable, const LOCKTAG *locktag,
                              uint32 hashcode)
{// #lizard forgives
    LWLock       *partitionLock = LockHashPartitionLock(hashcode);
    Oid            relid = locktag->locktag_field2;
    uint32        i;

    /*
     * Every PGPROC that can potentially hold a fast-path lock is present in
     * ProcGlobal->allProcs.  Prepared transactions are not, but any
     * outstanding fast-path locks held by prepared transactions are
     * transferred to the main lock table.
     */
    for (i = 0; i < ProcGlobal->allProcCount; i++)
    {
        PGPROC       *proc = &ProcGlobal->allProcs[i];
        uint32        f;

        LWLockAcquire(&proc->backendLock, LW_EXCLUSIVE);

        /*
         * If the target backend isn't referencing the same database as the
         * lock, then we needn't examine the individual relation IDs at all;
         * none of them can be relevant.
         *
         * proc->databaseId is set at backend startup time and never changes
         * thereafter, so it might be safe to perform this test before
         * acquiring &proc->backendLock.  In particular, it's certainly safe
         * to assume that if the target backend holds any fast-path locks, it
         * must have performed a memory-fencing operation (in particular, an
         * LWLock acquisition) since setting proc->databaseId.  However, it's
         * less clear that our backend is certain to have performed a memory
         * fencing operation since the other backend set proc->databaseId.  So
         * for now, we test it after acquiring the LWLock just to be safe.
         */
        if (proc->databaseId != locktag->locktag_field1)
        {
            LWLockRelease(&proc->backendLock);
            continue;
        }

        for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
        {
            uint32        lockmode;

            /* Look for an allocated slot matching the given relid. */
            if (relid != proc->fpRelId[f] || FAST_PATH_GET_BITS(proc, f) == 0)
                continue;

            /* Find or create lock object. */
            LWLockAcquire(partitionLock, LW_EXCLUSIVE);
            for (lockmode = FAST_PATH_LOCKNUMBER_OFFSET;
                 lockmode < FAST_PATH_LOCKNUMBER_OFFSET + FAST_PATH_BITS_PER_SLOT;
                 ++lockmode)
            {
                PROCLOCK   *proclock;

                if (!FAST_PATH_CHECK_LOCKMODE(proc, f, lockmode))
                    continue;
                proclock = SetupLockInTable(lockMethodTable, proc, locktag,
                                            hashcode, lockmode);
                if (!proclock)
                {
                    LWLockRelease(partitionLock);
                    LWLockRelease(&proc->backendLock);
                    return false;
                }
                GrantLock(proclock->tag.myLock, proclock, lockmode);
                FAST_PATH_CLEAR_LOCKMODE(proc, f, lockmode);
            }
            LWLockRelease(partitionLock);

            /* No need to examine remaining slots. */
            break;
        }
        LWLockRelease(&proc->backendLock);
    }
    return true;
}

/*
 * FastPathGetLockEntry
 *        Return the PROCLOCK for a lock originally taken via the fast-path,
 *        transferring it to the primary lock table if necessary.
 *
 * Note: caller takes care of updating the locallock object.
 */
static PROCLOCK *
FastPathGetRelationLockEntry(LOCALLOCK *locallock)
{// #lizard forgives
    LockMethod    lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD];
    LOCKTAG    *locktag = &locallock->tag.lock;
    PROCLOCK   *proclock = NULL;
    LWLock       *partitionLock = LockHashPartitionLock(locallock->hashcode);
    Oid            relid = locktag->locktag_field2;
    uint32        f;

    LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);

    for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
    {
        uint32        lockmode;

        /* Look for an allocated slot matching the given relid. */
        if (relid != MyProc->fpRelId[f] || FAST_PATH_GET_BITS(MyProc, f) == 0)
            continue;

        /* If we don't have a lock of the given mode, forget it! */
        lockmode = locallock->tag.mode;
        if (!FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode))
            break;

        /* Find or create lock object. */
        LWLockAcquire(partitionLock, LW_EXCLUSIVE);

        proclock = SetupLockInTable(lockMethodTable, MyProc, locktag,
                                    locallock->hashcode, lockmode);
        if (!proclock)
        {
            LWLockRelease(partitionLock);
            LWLockRelease(&MyProc->backendLock);
            ereport(ERROR,
                    (errcode(ERRCODE_OUT_OF_MEMORY),
                     errmsg("out of shared memory"),
                     errhint("You might need to increase max_locks_per_transaction.")));
        }
        GrantLock(proclock->tag.myLock, proclock, lockmode);
        FAST_PATH_CLEAR_LOCKMODE(MyProc, f, lockmode);

        LWLockRelease(partitionLock);

        /* No need to examine remaining slots. */
        break;
    }

    LWLockRelease(&MyProc->backendLock);

    /* Lock may have already been transferred by some other backend. */
    if (proclock == NULL)
    {
        LOCK       *lock;
        PROCLOCKTAG proclocktag;
        uint32        proclock_hashcode;

        LWLockAcquire(partitionLock, LW_SHARED);

        lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
                                                    (void *) locktag,
                                                    locallock->hashcode,
                                                    HASH_FIND,
                                                    NULL);
        if (!lock)
            elog(ERROR, "failed to re-find shared lock object");

        proclocktag.myLock = lock;
        proclocktag.myProc = MyProc;

        proclock_hashcode = ProcLockHashCode(&proclocktag, locallock->hashcode);
        proclock = (PROCLOCK *)
            hash_search_with_hash_value(LockMethodProcLockHash,
                                        (void *) &proclocktag,
                                        proclock_hashcode,
                                        HASH_FIND,
                                        NULL);
        if (!proclock)
            elog(ERROR, "failed to re-find shared proclock object");
        LWLockRelease(partitionLock);
    }

    return proclock;
}

/*
 * GetLockConflicts
 *        Get an array of VirtualTransactionIds of xacts currently holding locks
 *        that would conflict with the specified lock/lockmode.
 *        xacts merely awaiting such a lock are NOT reported.
 *
 * The result array is palloc'd and is terminated with an invalid VXID.
 *
 * Of course, the result could be out of date by the time it's returned,
 * so use of this function has to be thought about carefully.
 *
 * Note we never include the current xact's vxid in the result array,
 * since an xact never blocks itself.  Also, prepared transactions are
 * ignored, which is a bit more debatable but is appropriate for current
 * uses of the result.
 */
VirtualTransactionId *
GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
{// #lizard forgives
    static VirtualTransactionId *vxids;
    LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
    LockMethod    lockMethodTable;
    LOCK       *lock;
    LOCKMASK    conflictMask;
    SHM_QUEUE  *procLocks;
    PROCLOCK   *proclock;
    uint32        hashcode;
    LWLock       *partitionLock;
    int            count = 0;
    int            fast_count = 0;

    if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
        elog(ERROR, "unrecognized lock method: %d", lockmethodid);
    lockMethodTable = LockMethods[lockmethodid];
    if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
        elog(ERROR, "unrecognized lock mode: %d", lockmode);

    /*
     * Allocate memory to store results, and fill with InvalidVXID.  We only
     * need enough space for MaxBackends + a terminator, since prepared xacts
     * don't count. InHotStandby allocate once in TopMemoryContext.
     */
    if (InHotStandby)
    {
        if (vxids == NULL)
            vxids = (VirtualTransactionId *)
                MemoryContextAlloc(TopMemoryContext,
                                   sizeof(VirtualTransactionId) * (MaxBackends + 1));
    }
    else
        vxids = (VirtualTransactionId *)
            palloc0(sizeof(VirtualTransactionId) * (MaxBackends + 1));

    /* Compute hash code and partition lock, and look up conflicting modes. */
    hashcode = LockTagHashCode(locktag);
    partitionLock = LockHashPartitionLock(hashcode);
    conflictMask = lockMethodTable->conflictTab[lockmode];

    /*
     * Fast path locks might not have been entered in the primary lock table.
     * If the lock we're dealing with could conflict with such a lock, we must
     * examine each backend's fast-path array for conflicts.
     */
    if (ConflictsWithRelationFastPath(locktag, lockmode))
    {
        int            i;
        Oid            relid = locktag->locktag_field2;
        VirtualTransactionId vxid;

        /*
         * Iterate over relevant PGPROCs.  Anything held by a prepared
         * transaction will have been transferred to the primary lock table,
         * so we need not worry about those.  This is all a bit fuzzy, because
         * new locks could be taken after we've visited a particular
         * partition, but the callers had better be prepared to deal with that
         * anyway, since the locks could equally well be taken between the
         * time we return the value and the time the caller does something
         * with it.
         */
        for (i = 0; i < ProcGlobal->allProcCount; i++)
        {
            PGPROC       *proc = &ProcGlobal->allProcs[i];
            uint32        f;

            /* A backend never blocks itself */
            if (proc == MyProc)
                continue;

            LWLockAcquire(&proc->backendLock, LW_SHARED);

            /*
             * If the target backend isn't referencing the same database as
             * the lock, then we needn't examine the individual relation IDs
             * at all; none of them can be relevant.
             *
             * See FastPathTransferLocks() for discussion of why we do this
             * test after acquiring the lock.
             */
            if (proc->databaseId != locktag->locktag_field1)
            {
                LWLockRelease(&proc->backendLock);
                continue;
            }

            for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
            {
                uint32        lockmask;

                /* Look for an allocated slot matching the given relid. */
                if (relid != proc->fpRelId[f])
                    continue;
                lockmask = FAST_PATH_GET_BITS(proc, f);
                if (!lockmask)
                    continue;
                lockmask <<= FAST_PATH_LOCKNUMBER_OFFSET;

                /*
                 * There can only be one entry per relation, so if we found it
                 * and it doesn't conflict, we can skip the rest of the slots.
                 */
                if ((lockmask & conflictMask) == 0)
                    break;

                /* Conflict! */
                GET_VXID_FROM_PGPROC(vxid, *proc);

                /*
                 * If we see an invalid VXID, then either the xact has already
                 * committed (or aborted), or it's a prepared xact.  In either
                 * case we may ignore it.
                 */
                if (VirtualTransactionIdIsValid(vxid))
                    vxids[count++] = vxid;

                /* No need to examine remaining slots. */
                break;
            }

            LWLockRelease(&proc->backendLock);
        }
    }

    /* Remember how many fast-path conflicts we found. */
    fast_count = count;

    /*
     * Look up the lock object matching the tag.
     */
    LWLockAcquire(partitionLock, LW_SHARED);

    lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
                                                (const void *) locktag,
                                                hashcode,
                                                HASH_FIND,
                                                NULL);
    if (!lock)
    {
        /*
         * If the lock object doesn't exist, there is nothing holding a lock
         * on this lockable object.
         */
        LWLockRelease(partitionLock);
        vxids[count].backendId = InvalidBackendId;
        vxids[count].localTransactionId = InvalidLocalTransactionId;
        return vxids;
    }

    /*
     * Examine each existing holder (or awaiter) of the lock.
     */

    procLocks = &(lock->procLocks);

    proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
                                         offsetof(PROCLOCK, lockLink));

    while (proclock)
    {
        if (conflictMask & proclock->holdMask)
        {
            PGPROC       *proc = proclock->tag.myProc;

            /* A backend never blocks itself */
            if (proc != MyProc)
            {
                VirtualTransactionId vxid;

                GET_VXID_FROM_PGPROC(vxid, *proc);

                /*
                 * If we see an invalid VXID, then either the xact has already
                 * committed (or aborted), or it's a prepared xact.  In either
                 * case we may ignore it.
                 */
                if (VirtualTransactionIdIsValid(vxid))
                {
                    int            i;

                    /* Avoid duplicate entries. */
                    for (i = 0; i < fast_count; ++i)
                        if (VirtualTransactionIdEquals(vxids[i], vxid))
                            break;
                    if (i >= fast_count)
                        vxids[count++] = vxid;
                }
            }
        }

        proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
                                             offsetof(PROCLOCK, lockLink));
    }

    LWLockRelease(partitionLock);

    if (count > MaxBackends)    /* should never happen */
        elog(PANIC, "too many conflicting locks found");

    vxids[count].backendId = InvalidBackendId;
    vxids[count].localTransactionId = InvalidLocalTransactionId;
    return vxids;
}

/*
 * Find a lock in the shared lock table and release it.  It is the caller's
 * responsibility to verify that this is a sane thing to do.  (For example, it
 * would be bad to release a lock here if there might still be a LOCALLOCK
 * object with pointers to it.)
 *
 * We currently use this in two situations: first, to release locks held by
 * prepared transactions on commit (see lock_twophase_postcommit); and second,
 * to release locks taken via the fast-path, transferred to the main hash
 * table, and then released (see LockReleaseAll).
 */
static void
LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
                     LOCKTAG *locktag, LOCKMODE lockmode,
                     bool decrement_strong_lock_count)
{
    LOCK       *lock;
    PROCLOCK   *proclock;
    PROCLOCKTAG proclocktag;
    uint32        hashcode;
    uint32        proclock_hashcode;
    LWLock       *partitionLock;
    bool        wakeupNeeded;

    hashcode = LockTagHashCode(locktag);
    partitionLock = LockHashPartitionLock(hashcode);

    LWLockAcquire(partitionLock, LW_EXCLUSIVE);

    /*
     * Re-find the lock object (it had better be there).
     */
    lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
                                                (void *) locktag,
                                                hashcode,
                                                HASH_FIND,
                                                NULL);
    if (!lock)
        elog(PANIC, "failed to re-find shared lock object");

    /*
     * Re-find the proclock object (ditto).
     */
    proclocktag.myLock = lock;
    proclocktag.myProc = proc;

    proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);

    proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
                                                        (void *) &proclocktag,
                                                        proclock_hashcode,
                                                        HASH_FIND,
                                                        NULL);
    if (!proclock)
        elog(PANIC, "failed to re-find shared proclock object");

    /*
     * Double-check that we are actually holding a lock of the type we want to
     * release.
     */
    if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
    {
        PROCLOCK_PRINT("lock_twophase_postcommit: WRONGTYPE", proclock);
        LWLockRelease(partitionLock);
        elog(WARNING, "you don't own a lock of type %s",
             lockMethodTable->lockModeNames[lockmode]);
        return;
    }

    /*
     * Do the releasing.  CleanUpLock will waken any now-wakable waiters.
     */
    wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);

    CleanUpLock(lock, proclock,
                lockMethodTable, hashcode,
                wakeupNeeded);

    LWLockRelease(partitionLock);

    /*
     * Decrement strong lock count.  This logic is needed only for 2PC.
     */
    if (decrement_strong_lock_count
        && ConflictsWithRelationFastPath(locktag, lockmode))
    {
        uint32        fasthashcode = FastPathStrongLockHashPartition(hashcode);

        SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
        Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0);
        FastPathStrongRelationLocks->count[fasthashcode]--;
        SpinLockRelease(&FastPathStrongRelationLocks->mutex);
    }
}

/*
 * AtPrepare_Locks
 *        Do the preparatory work for a PREPARE: make 2PC state file records
 *        for all locks currently held.
 *
 * Session-level locks are ignored, as are VXID locks.
 *
 * There are some special cases that we error out on: we can't be holding any
 * locks at both session and transaction level (since we must either keep or
 * give away the PROCLOCK object), and we can't be holding any locks on
 * temporary objects (since that would mess up the current backend if it tries
 * to exit before the prepared xact is committed).
 */
void
AtPrepare_Locks(void)
{// #lizard forgives
    HASH_SEQ_STATUS status;
    LOCALLOCK  *locallock;

    /*
     * For the most part, we don't need to touch shared memory for this ---
     * all the necessary state information is in the locallock table.
     * Fast-path locks are an exception, however: we move any such locks to
     * the main table before allowing PREPARE TRANSACTION to succeed.
     */
    hash_seq_init(&status, LockMethodLocalHash);

    while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
    {
        TwoPhaseLockRecord record;
        LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
        bool        haveSessionLock;
        bool        haveXactLock;
        int            i;

        /*
         * Ignore VXID locks.  We don't want those to be held by prepared
         * transactions, since they aren't meaningful after a restart.
         */
        if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
            continue;

        /* Ignore it if we don't actually hold the lock */
        if (locallock->nLocks <= 0)
            continue;

        /* Scan to see whether we hold it at session or transaction level */
        haveSessionLock = haveXactLock = false;
        for (i = locallock->numLockOwners - 1; i >= 0; i--)
        {
            if (lockOwners[i].owner == NULL)
                haveSessionLock = true;
            else
                haveXactLock = true;
        }

        /* Ignore it if we have only session lock */
        if (!haveXactLock)
            continue;

        /*
         * If we have both session- and transaction-level locks, fail.  This
         * should never happen with regular locks, since we only take those at
         * session level in some special operations like VACUUM.  It's
         * possible to hit this with advisory locks, though.
         *
         * It would be nice if we could keep the session hold and give away
         * the transactional hold to the prepared xact.  However, that would
         * require two PROCLOCK objects, and we cannot be sure that another
         * PROCLOCK will be available when it comes time for PostPrepare_Locks
         * to do the deed.  So for now, we error out while we can still do so
         * safely.
         */
        if (haveSessionLock)
            ereport(ERROR,
                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                     errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object")));

        /*
         * If the local lock was taken via the fast-path, we need to move it
         * to the primary lock table, or just get a pointer to the existing
         * primary lock table entry if by chance it's already been
         * transferred.
         */
        if (locallock->proclock == NULL)
        {
            locallock->proclock = FastPathGetRelationLockEntry(locallock);
            locallock->lock = locallock->proclock->tag.myLock;
        }

        /*
         * Arrange to not release any strong lock count held by this lock
         * entry.  We must retain the count until the prepared transaction is
         * committed or rolled back.
         */
        locallock->holdsStrongLockCount = FALSE;

        /*
         * Create a 2PC record.
         */
        memcpy(&(record.locktag), &(locallock->tag.lock), sizeof(LOCKTAG));
        record.lockmode = locallock->tag.mode;

        RegisterTwoPhaseRecord(TWOPHASE_RM_LOCK_ID, 0,
                               &record, sizeof(TwoPhaseLockRecord));
    }
}

/*
 * PostPrepare_Locks
 *        Clean up after successful PREPARE
 *
 * Here, we want to transfer ownership of our locks to a dummy PGPROC
 * that's now associated with the prepared transaction, and we want to
 * clean out the corresponding entries in the LOCALLOCK table.
 *
 * Note: by removing the LOCALLOCK entries, we are leaving dangling
 * pointers in the transaction's resource owner.  This is OK at the
 * moment since resowner.c doesn't try to free locks retail at a toplevel
 * transaction commit or abort.  We could alternatively zero out nLocks
 * and leave the LOCALLOCK entries to be garbage-collected by LockReleaseAll,
 * but that probably costs more cycles.
 */
void
PostPrepare_Locks(TransactionId xid)
{// #lizard forgives
    PGPROC       *newproc = TwoPhaseGetDummyProc(xid);
    HASH_SEQ_STATUS status;
    LOCALLOCK  *locallock;
    LOCK       *lock;
    PROCLOCK   *proclock;
    PROCLOCKTAG proclocktag;
    int            partition;

    /* Can't prepare a lock group follower. */
    Assert(MyProc->lockGroupLeader == NULL ||
           MyProc->lockGroupLeader == MyProc);

    /* This is a critical section: any error means big trouble */
    START_CRIT_SECTION();

    /*
     * First we run through the locallock table and get rid of unwanted
     * entries, then we scan the process's proclocks and transfer them to the
     * target proc.
     *
     * We do this separately because we may have multiple locallock entries
     * pointing to the same proclock, and we daren't end up with any dangling
     * pointers.
     */
    hash_seq_init(&status, LockMethodLocalHash);

    while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
    {
        LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
        bool        haveSessionLock;
        bool        haveXactLock;
        int            i;

        if (locallock->proclock == NULL || locallock->lock == NULL)
        {
            /*
             * We must've run out of shared memory while trying to set up this
             * lock.  Just forget the local entry.
             */
            Assert(locallock->nLocks == 0);
            RemoveLocalLock(locallock);
            continue;
        }

        /* Ignore VXID locks */
        if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
            continue;

        /* Scan to see whether we hold it at session or transaction level */
        haveSessionLock = haveXactLock = false;
        for (i = locallock->numLockOwners - 1; i >= 0; i--)
        {
            if (lockOwners[i].owner == NULL)
                haveSessionLock = true;
            else
                haveXactLock = true;
        }

        /* Ignore it if we have only session lock */
        if (!haveXactLock)
            continue;

        /* This can't happen, because we already checked it */
        if (haveSessionLock)
            ereport(PANIC,
                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                     errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object")));

        /* Mark the proclock to show we need to release this lockmode */
        if (locallock->nLocks > 0)
            locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);

        /* And remove the locallock hashtable entry */
        RemoveLocalLock(locallock);
    }

    /*
     * Now, scan each lock partition separately.
     */
    for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
    {
        LWLock       *partitionLock;
        SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
        PROCLOCK   *nextplock;

        partitionLock = LockHashPartitionLockByIndex(partition);

        /*
         * If the proclock list for this partition is empty, we can skip
         * acquiring the partition lock.  This optimization is safer than the
         * situation in LockReleaseAll, because we got rid of any fast-path
         * locks during AtPrepare_Locks, so there cannot be any case where
         * another backend is adding something to our lists now.  For safety,
         * though, we code this the same way as in LockReleaseAll.
         */
        if (SHMQueueNext(procLocks, procLocks,
                         offsetof(PROCLOCK, procLink)) == NULL)
            continue;            /* needn't examine this partition */

        LWLockAcquire(partitionLock, LW_EXCLUSIVE);

        for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
                                                  offsetof(PROCLOCK, procLink));
             proclock;
             proclock = nextplock)
        {
            /* Get link first, since we may unlink/relink this proclock */
            nextplock = (PROCLOCK *)
                SHMQueueNext(procLocks, &proclock->procLink,
                             offsetof(PROCLOCK, procLink));

            Assert(proclock->tag.myProc == MyProc);

            lock = proclock->tag.myLock;

            /* Ignore VXID locks */
            if (lock->tag.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
                continue;

            PROCLOCK_PRINT("PostPrepare_Locks", proclock);
            LOCK_PRINT("PostPrepare_Locks", lock, 0);
            Assert(lock->nRequested >= 0);
            Assert(lock->nGranted >= 0);
            Assert(lock->nGranted <= lock->nRequested);
            Assert((proclock->holdMask & ~lock->grantMask) == 0);

            /* Ignore it if nothing to release (must be a session lock) */
            if (proclock->releaseMask == 0)
                continue;

            /* Else we should be releasing all locks */
            if (proclock->releaseMask != proclock->holdMask)
                elog(PANIC, "we seem to have dropped a bit somewhere");

            /*
             * We cannot simply modify proclock->tag.myProc to reassign
             * ownership of the lock, because that's part of the hash key and
             * the proclock would then be in the wrong hash chain.  Instead
             * use hash_update_hash_key.  (We used to create a new hash entry,
             * but that risks out-of-memory failure if other processes are
             * busy making proclocks too.)    We must unlink the proclock from
             * our procLink chain and put it into the new proc's chain, too.
             *
             * Note: the updated proclock hash key will still belong to the
             * same hash partition, cf proclock_hash().  So the partition lock
             * we already hold is sufficient for this.
             */
            SHMQueueDelete(&proclock->procLink);

            /*
             * Create the new hash key for the proclock.
             */
            proclocktag.myLock = lock;
            proclocktag.myProc = newproc;

            /*
             * Update groupLeader pointer to point to the new proc.  (We'd
             * better not be a member of somebody else's lock group!)
             */
            Assert(proclock->groupLeader == proclock->tag.myProc);
            proclock->groupLeader = newproc;

            /*
             * Update the proclock.  We should not find any existing entry for
             * the same hash key, since there can be only one entry for any
             * given lock with my own proc.
             */
            if (!hash_update_hash_key(LockMethodProcLockHash,
                                      (void *) proclock,
                                      (void *) &proclocktag))
                elog(PANIC, "duplicate entry found while reassigning a prepared transaction's locks");

            /* Re-link into the new proc's proclock list */
            SHMQueueInsertBefore(&(newproc->myProcLocks[partition]),
                                 &proclock->procLink);

            PROCLOCK_PRINT("PostPrepare_Locks: updated", proclock);
        }                        /* loop over PROCLOCKs within this partition */

        LWLockRelease(partitionLock);
    }                            /* loop over partitions */

    END_CRIT_SECTION();
}


/*
 * Estimate shared-memory space used for lock tables
 */
Size
LockShmemSize(void)
{
    Size        size = 0;
    long        max_table_size;

    /* lock hash table */
    max_table_size = NLOCKENTS();
    size = add_size(size, hash_estimate_size(max_table_size, sizeof(LOCK)));

    /* proclock hash table */
    max_table_size *= 2;
    size = add_size(size, hash_estimate_size(max_table_size, sizeof(PROCLOCK)));

    /*
     * Since NLOCKENTS is only an estimate, add 10% safety margin.
     */
    size = add_size(size, size / 10);

    return size;
}

/*
 * GetLockStatusData - Return a summary of the lock manager's internal
 * status, for use in a user-level reporting function.
 *
 * The return data consists of an array of LockInstanceData objects,
 * which are a lightly abstracted version of the PROCLOCK data structures,
 * i.e. there is one entry for each unique lock and interested PGPROC.
 * It is the caller's responsibility to match up related items (such as
 * references to the same lockable object or PGPROC) if wanted.
 *
 * The design goal is to hold the LWLocks for as short a time as possible;
 * thus, this function simply makes a copy of the necessary data and releases
 * the locks, allowing the caller to contemplate and format the data for as
 * long as it pleases.
 */
LockData *
GetLockStatusData(void)
{// #lizard forgives
    LockData   *data;
    PROCLOCK   *proclock;
    HASH_SEQ_STATUS seqstat;
    int            els;
    int            el;
    int            i;

    data = (LockData *) palloc(sizeof(LockData));

    /* Guess how much space we'll need. */
    els = MaxBackends;
    el = 0;
    data->locks = (LockInstanceData *) palloc(sizeof(LockInstanceData) * els);

    /*
     * First, we iterate through the per-backend fast-path arrays, locking
     * them one at a time.  This might produce an inconsistent picture of the
     * system state, but taking all of those LWLocks at the same time seems
     * impractical (in particular, note MAX_SIMUL_LWLOCKS).  It shouldn't
     * matter too much, because none of these locks can be involved in lock
     * conflicts anyway - anything that might must be present in the main lock
     * table.  (For the same reason, we don't sweat about making leaderPid
     * completely valid.  We cannot safely dereference another backend's
     * lockGroupLeader field without holding all lock partition locks, and
     * it's not worth that.)
     */
    for (i = 0; i < ProcGlobal->allProcCount; ++i)
    {
        PGPROC       *proc = &ProcGlobal->allProcs[i];
        uint32        f;

        LWLockAcquire(&proc->backendLock, LW_SHARED);

        for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; ++f)
        {
            LockInstanceData *instance;
            uint32        lockbits = FAST_PATH_GET_BITS(proc, f);

            /* Skip unallocated slots. */
            if (!lockbits)
                continue;

            if (el >= els)
            {
                els += MaxBackends;
                data->locks = (LockInstanceData *)
                    repalloc(data->locks, sizeof(LockInstanceData) * els);
            }

            instance = &data->locks[el];
            SET_LOCKTAG_RELATION(instance->locktag, proc->databaseId,
                                 proc->fpRelId[f]);
            instance->holdMask = lockbits << FAST_PATH_LOCKNUMBER_OFFSET;
            instance->waitLockMode = NoLock;
            instance->backend = proc->backendId;
            instance->lxid = proc->lxid;
            instance->pid = proc->pid;
            instance->leaderPid = proc->pid;
            instance->fastpath = true;

            el++;
        }

        if (proc->fpVXIDLock)
        {
            VirtualTransactionId vxid;
            LockInstanceData *instance;

            if (el >= els)
            {
                els += MaxBackends;
                data->locks = (LockInstanceData *)
                    repalloc(data->locks, sizeof(LockInstanceData) * els);
            }

            vxid.backendId = proc->backendId;
            vxid.localTransactionId = proc->fpLocalTransactionId;

            instance = &data->locks[el];
            SET_LOCKTAG_VIRTUALTRANSACTION(instance->locktag, vxid);
            instance->holdMask = LOCKBIT_ON(ExclusiveLock);
            instance->waitLockMode = NoLock;
            instance->backend = proc->backendId;
            instance->lxid = proc->lxid;
            instance->pid = proc->pid;
            instance->leaderPid = proc->pid;
            instance->fastpath = true;

            el++;
        }

        LWLockRelease(&proc->backendLock);
    }

    /*
     * Next, acquire lock on the entire shared lock data structure.  We do
     * this so that, at least for locks in the primary lock table, the state
     * will be self-consistent.
     *
     * Since this is a read-only operation, we take shared instead of
     * exclusive lock.  There's not a whole lot of point to this, because all
     * the normal operations require exclusive lock, but it doesn't hurt
     * anything either. It will at least allow two backends to do
     * GetLockStatusData in parallel.
     *
     * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
     */
    for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
        LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED);

    /* Now we can safely count the number of proclocks */
    data->nelements = el + hash_get_num_entries(LockMethodProcLockHash);
    if (data->nelements > els)
    {
        els = data->nelements;
        data->locks = (LockInstanceData *)
            repalloc(data->locks, sizeof(LockInstanceData) * els);
    }

    /* Now scan the tables to copy the data */
    hash_seq_init(&seqstat, LockMethodProcLockHash);

    while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat)))
    {
        PGPROC       *proc = proclock->tag.myProc;
        LOCK       *lock = proclock->tag.myLock;
        LockInstanceData *instance = &data->locks[el];

        memcpy(&instance->locktag, &lock->tag, sizeof(LOCKTAG));
        instance->holdMask = proclock->holdMask;
        if (proc->waitLock == proclock->tag.myLock)
            instance->waitLockMode = proc->waitLockMode;
        else
            instance->waitLockMode = NoLock;
        instance->backend = proc->backendId;
        instance->lxid = proc->lxid;
        instance->pid = proc->pid;
        instance->leaderPid = proclock->groupLeader->pid;
        instance->fastpath = false;

        el++;
    }

    /*
     * And release locks.  We do this in reverse order for two reasons: (1)
     * Anyone else who needs more than one of the locks will be trying to lock
     * them in increasing order; we don't want to release the other process
     * until it can get all the locks it needs. (2) This avoids O(N^2)
     * behavior inside LWLockRelease.
     */
    for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
        LWLockRelease(LockHashPartitionLockByIndex(i));

    Assert(el == data->nelements);

    return data;
}

/*
 * GetBlockerStatusData - Return a summary of the lock manager's state
 * concerning locks that are blocking the specified PID or any member of
 * the PID's lock group, for use in a user-level reporting function.
 *
 * For each PID within the lock group that is awaiting some heavyweight lock,
 * the return data includes an array of LockInstanceData objects, which are
 * the same data structure used by GetLockStatusData; but unlike that function,
 * this one reports only the PROCLOCKs associated with the lock that that PID
 * is blocked on.  (Hence, all the locktags should be the same for any one
 * blocked PID.)  In addition, we return an array of the PIDs of those backends
 * that are ahead of the blocked PID in the lock's wait queue.  These can be
 * compared with the PIDs in the LockInstanceData objects to determine which
 * waiters are ahead of or behind the blocked PID in the queue.
 *
 * If blocked_pid isn't a valid backend PID or nothing in its lock group is
 * waiting on any heavyweight lock, return empty arrays.
 *
 * The design goal is to hold the LWLocks for as short a time as possible;
 * thus, this function simply makes a copy of the necessary data and releases
 * the locks, allowing the caller to contemplate and format the data for as
 * long as it pleases.
 */
BlockedProcsData *
GetBlockerStatusData(int blocked_pid)
{
    BlockedProcsData *data;
    PGPROC       *proc;
    int            i;

    data = (BlockedProcsData *) palloc(sizeof(BlockedProcsData));

    /*
     * Guess how much space we'll need, and preallocate.  Most of the time
     * this will avoid needing to do repalloc while holding the LWLocks.  (We
     * assume, but check with an Assert, that MaxBackends is enough entries
     * for the procs[] array; the other two could need enlargement, though.)
     */
    data->nprocs = data->nlocks = data->npids = 0;
    data->maxprocs = data->maxlocks = data->maxpids = MaxBackends;
    data->procs = (BlockedProcData *) palloc(sizeof(BlockedProcData) * data->maxprocs);
    data->locks = (LockInstanceData *) palloc(sizeof(LockInstanceData) * data->maxlocks);
    data->waiter_pids = (int *) palloc(sizeof(int) * data->maxpids);

    /*
     * Acquire lock on the entire shared lock data structure.  See notes
     * in GetLockStatusData().
     */
    for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
        LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED);

	/*
     * In order to search the ProcArray for blocked_pid and assume that that
     * entry won't immediately disappear under us, we must hold ProcArrayLock.
     * In addition, to examine the lock grouping fields of any other backend,
     * we must hold all the hash partition locks.  (Only one of those locks is
     * actually relevant for any one lock group, but we can't know which one
     * ahead of time.)    It's fairly annoying to hold all those locks
     * throughout this, but it's no worse than GetLockStatusData(), and it
     * does have the advantage that we're guaranteed to return a
     * self-consistent instantaneous state.
     */
    LWLockAcquire(ProcArrayLock, LW_SHARED);

    proc = BackendPidGetProcWithLock(blocked_pid);

    /* Nothing to do if it's gone */
    if (proc != NULL)
    {
        if (proc->lockGroupLeader == NULL)
        {
            /* Easy case, proc is not a lock group member */
            GetSingleProcBlockerStatusData(proc, data);
        }
        else
        {
            /* Examine all procs in proc's lock group */
            dlist_iter    iter;

            dlist_foreach(iter, &proc->lockGroupLeader->lockGroupMembers)
            {
                PGPROC       *memberProc;

                memberProc = dlist_container(PGPROC, lockGroupLink, iter.cur);
                GetSingleProcBlockerStatusData(memberProc, data);
            }
        }

		Assert(data->nprocs <= data->maxprocs);
	}

	LWLockRelease(ProcArrayLock);

        /*
         * And release locks.  See notes in GetLockStatusData().
         */
        for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
            LWLockRelease(LockHashPartitionLockByIndex(i));

    return data;
}

/* Accumulate data about one possibly-blocked proc for GetBlockerStatusData */
static void
GetSingleProcBlockerStatusData(PGPROC *blocked_proc, BlockedProcsData *data)
{// #lizard forgives
    LOCK       *theLock = blocked_proc->waitLock;
    BlockedProcData *bproc;
    SHM_QUEUE  *procLocks;
    PROCLOCK   *proclock;
    PROC_QUEUE *waitQueue;
    PGPROC       *proc;
    int            queue_size;
    int            i;

    /* Nothing to do if this proc is not blocked */
    if (theLock == NULL)
        return;

    /* Set up a procs[] element */
    bproc = &data->procs[data->nprocs++];
    bproc->pid = blocked_proc->pid;
    bproc->first_lock = data->nlocks;
    bproc->first_waiter = data->npids;

    /*
     * We may ignore the proc's fast-path arrays, since nothing in those could
     * be related to a contended lock.
     */

    /* Collect all PROCLOCKs associated with theLock */
    procLocks = &(theLock->procLocks);
    proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
                                         offsetof(PROCLOCK, lockLink));
    while (proclock)
    {
        PGPROC       *proc = proclock->tag.myProc;
        LOCK       *lock = proclock->tag.myLock;
        LockInstanceData *instance;

        if (data->nlocks >= data->maxlocks)
        {
            data->maxlocks += MaxBackends;
            data->locks = (LockInstanceData *)
                repalloc(data->locks, sizeof(LockInstanceData) * data->maxlocks);
        }

        instance = &data->locks[data->nlocks];
        memcpy(&instance->locktag, &lock->tag, sizeof(LOCKTAG));
        instance->holdMask = proclock->holdMask;
        if (proc->waitLock == lock)
            instance->waitLockMode = proc->waitLockMode;
        else
            instance->waitLockMode = NoLock;
        instance->backend = proc->backendId;
        instance->lxid = proc->lxid;
        instance->pid = proc->pid;
        instance->leaderPid = proclock->groupLeader->pid;
        instance->fastpath = false;
        data->nlocks++;

        proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
                                             offsetof(PROCLOCK, lockLink));
    }

    /* Enlarge waiter_pids[] if it's too small to hold all wait queue PIDs */
    waitQueue = &(theLock->waitProcs);
    queue_size = waitQueue->size;

    if (queue_size > data->maxpids - data->npids)
    {
        data->maxpids = Max(data->maxpids + MaxBackends,
                            data->npids + queue_size);
        data->waiter_pids = (int *) repalloc(data->waiter_pids,
                                             sizeof(int) * data->maxpids);
    }

    /* Collect PIDs from the lock's wait queue, stopping at blocked_proc */
    proc = (PGPROC *) waitQueue->links.next;
    for (i = 0; i < queue_size; i++)
    {
        if (proc == blocked_proc)
            break;
        data->waiter_pids[data->npids++] = proc->pid;
        proc = (PGPROC *) proc->links.next;
    }

    bproc->num_locks = data->nlocks - bproc->first_lock;
    bproc->num_waiters = data->npids - bproc->first_waiter;
}

/*
 * Returns a list of currently held AccessExclusiveLocks, for use by
 * LogStandbySnapshot().  The result is a palloc'd array,
 * with the number of elements returned into *nlocks.
 *
 * XXX This currently takes a lock on all partitions of the lock table,
 * but it's possible to do better.  By reference counting locks and storing
 * the value in the ProcArray entry for each backend we could tell if any
 * locks need recording without having to acquire the partition locks and
 * scan the lock table.  Whether that's worth the additional overhead
 * is pretty dubious though.
 */
xl_standby_lock *
GetRunningTransactionLocks(int *nlocks)
{
    xl_standby_lock *accessExclusiveLocks;
    PROCLOCK   *proclock;
    HASH_SEQ_STATUS seqstat;
    int            i;
    int            index;
    int            els;

    /*
     * Acquire lock on the entire shared lock data structure.
     *
     * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
     */
    for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
        LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED);

    /* Now we can safely count the number of proclocks */
    els = hash_get_num_entries(LockMethodProcLockHash);

    /*
     * Allocating enough space for all locks in the lock table is overkill,
     * but it's more convenient and faster than having to enlarge the array.
     */
    accessExclusiveLocks = palloc(els * sizeof(xl_standby_lock));

    /* Now scan the tables to copy the data */
    hash_seq_init(&seqstat, LockMethodProcLockHash);

    /*
     * If lock is a currently granted AccessExclusiveLock then it will have
     * just one proclock holder, so locks are never accessed twice in this
     * particular case. Don't copy this code for use elsewhere because in the
     * general case this will give you duplicate locks when looking at
     * non-exclusive lock types.
     */
    index = 0;
    while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat)))
    {
        /* make sure this definition matches the one used in LockAcquire */
        if ((proclock->holdMask & LOCKBIT_ON(AccessExclusiveLock)) &&
            proclock->tag.myLock->tag.locktag_type == LOCKTAG_RELATION)
        {
            PGPROC       *proc = proclock->tag.myProc;
            PGXACT       *pgxact = &ProcGlobal->allPgXact[proc->pgprocno];
            LOCK       *lock = proclock->tag.myLock;
            TransactionId xid = pgxact->xid;

            /*
             * Don't record locks for transactions if we know they have
             * already issued their WAL record for commit but not yet released
             * lock. It is still possible that we see locks held by already
             * complete transactions, if they haven't yet zeroed their xids.
             */
            if (!TransactionIdIsValid(xid))
                continue;

            accessExclusiveLocks[index].xid = xid;
            accessExclusiveLocks[index].dbOid = lock->tag.locktag_field1;
            accessExclusiveLocks[index].relOid = lock->tag.locktag_field2;

            index++;
        }
    }

    Assert(index <= els);

    /*
     * And release locks.  We do this in reverse order for two reasons: (1)
     * Anyone else who needs more than one of the locks will be trying to lock
     * them in increasing order; we don't want to release the other process
     * until it can get all the locks it needs. (2) This avoids O(N^2)
     * behavior inside LWLockRelease.
     */
    for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
        LWLockRelease(LockHashPartitionLockByIndex(i));

    *nlocks = index;
    return accessExclusiveLocks;
}

/* Provide the textual name of any lock mode */
const char *
GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode)
{
    Assert(lockmethodid > 0 && lockmethodid < lengthof(LockMethods));
    Assert(mode > 0 && mode <= LockMethods[lockmethodid]->numLockModes);
    return LockMethods[lockmethodid]->lockModeNames[mode];
}

#ifdef LOCK_DEBUG
/*
 * Dump all locks in the given proc's myProcLocks lists.
 *
 * Caller is responsible for having acquired appropriate LWLocks.
 */
void
DumpLocks(PGPROC *proc)
{
    SHM_QUEUE  *procLocks;
    PROCLOCK   *proclock;
    LOCK       *lock;
    int            i;

    if (proc == NULL)
        return;

    if (proc->waitLock)
        LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);

    for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
    {
        procLocks = &(proc->myProcLocks[i]);

        proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
                                             offsetof(PROCLOCK, procLink));

        while (proclock)
        {
            Assert(proclock->tag.myProc == proc);

            lock = proclock->tag.myLock;

            PROCLOCK_PRINT("DumpLocks", proclock);
            LOCK_PRINT("DumpLocks", lock, 0);

            proclock = (PROCLOCK *)
                SHMQueueNext(procLocks, &proclock->procLink,
                             offsetof(PROCLOCK, procLink));
        }
    }
}

/*
 * Dump all lmgr locks.
 *
 * Caller is responsible for having acquired appropriate LWLocks.
 */
void
DumpAllLocks(void)
{
    PGPROC       *proc;
    PROCLOCK   *proclock;
    LOCK       *lock;
    HASH_SEQ_STATUS status;

    proc = MyProc;

    if (proc && proc->waitLock)
        LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);

    hash_seq_init(&status, LockMethodProcLockHash);

    while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL)
    {
        PROCLOCK_PRINT("DumpAllLocks", proclock);

        lock = proclock->tag.myLock;
        if (lock)
            LOCK_PRINT("DumpAllLocks", lock, 0);
        else
            elog(LOG, "DumpAllLocks: proclock->tag.myLock = NULL");
    }
}
#endif                            /* LOCK_DEBUG */

/*
 * LOCK 2PC resource manager's routines
 */

/*
 * Re-acquire a lock belonging to a transaction that was prepared.
 *
 * Because this function is run at db startup, re-acquiring the locks should
 * never conflict with running transactions because there are none.  We
 * assume that the lock state represented by the stored 2PC files is legal.
 *
 * When switching from Hot Standby mode to normal operation, the locks will
 * be already held by the startup process. The locks are acquired for the new
 * procs without checking for conflicts, so we don't get a conflict between the
 * startup process and the dummy procs, even though we will momentarily have
 * a situation where two procs are holding the same AccessExclusiveLock,
 * which isn't normally possible because the conflict. If we're in standby
 * mode, but a recovery snapshot hasn't been established yet, it's possible
 * that some but not all of the locks are already held by the startup process.
 *
 * This approach is simple, but also a bit dangerous, because if there isn't
 * enough shared memory to acquire the locks, an error will be thrown, which
 * is promoted to FATAL and recovery will abort, bringing down postmaster.
 * A safer approach would be to transfer the locks like we do in
 * AtPrepare_Locks, but then again, in hot standby mode it's possible for
 * read-only backends to use up all the shared lock memory anyway, so that
 * replaying the WAL record that needs to acquire a lock will throw an error
 * and PANIC anyway.
 */
void
lock_twophase_recover(TransactionId xid, uint16 info,
                      void *recdata, uint32 len)
{// #lizard forgives
    TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
    PGPROC       *proc = TwoPhaseGetDummyProc(xid);
    LOCKTAG    *locktag;
    LOCKMODE    lockmode;
    LOCKMETHODID lockmethodid;
    LOCK       *lock;
    PROCLOCK   *proclock;
    PROCLOCKTAG proclocktag;
    bool        found;
    uint32        hashcode;
    uint32        proclock_hashcode;
    int            partition;
    LWLock       *partitionLock;
    LockMethod    lockMethodTable;

    Assert(len == sizeof(TwoPhaseLockRecord));
    locktag = &rec->locktag;
    lockmode = rec->lockmode;
    lockmethodid = locktag->locktag_lockmethodid;

    if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
        elog(ERROR, "unrecognized lock method: %d", lockmethodid);
    lockMethodTable = LockMethods[lockmethodid];

    hashcode = LockTagHashCode(locktag);
    partition = LockHashPartition(hashcode);
    partitionLock = LockHashPartitionLock(hashcode);

    LWLockAcquire(partitionLock, LW_EXCLUSIVE);

    /*
     * Find or create a lock with this tag.
     */
    lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
                                                (void *) locktag,
                                                hashcode,
                                                HASH_ENTER_NULL,
                                                &found);
    if (!lock)
    {
        LWLockRelease(partitionLock);
        ereport(ERROR,
                (errcode(ERRCODE_OUT_OF_MEMORY),
                 errmsg("out of shared memory"),
                 errhint("You might need to increase max_locks_per_transaction.")));
    }

    /*
     * if it's a new lock object, initialize it
     */
    if (!found)
    {
        lock->grantMask = 0;
        lock->waitMask = 0;
        SHMQueueInit(&(lock->procLocks));
        ProcQueueInit(&(lock->waitProcs));
        lock->nRequested = 0;
        lock->nGranted = 0;
        MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
        MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
        LOCK_PRINT("lock_twophase_recover: new", lock, lockmode);
    }
    else
    {
        LOCK_PRINT("lock_twophase_recover: found", lock, lockmode);
        Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
        Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
        Assert(lock->nGranted <= lock->nRequested);
    }

    /*
     * Create the hash key for the proclock table.
     */
    proclocktag.myLock = lock;
    proclocktag.myProc = proc;

    proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);

    /*
     * Find or create a proclock entry with this tag
     */
    proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
                                                        (void *) &proclocktag,
                                                        proclock_hashcode,
                                                        HASH_ENTER_NULL,
                                                        &found);
    if (!proclock)
    {
        /* Oops, not enough shmem for the proclock */
        if (lock->nRequested == 0)
        {
            /*
             * There are no other requestors of this lock, so garbage-collect
             * the lock object.  We *must* do this to avoid a permanent leak
             * of shared memory, because there won't be anything to cause
             * anyone to release the lock object later.
             */
            Assert(SHMQueueEmpty(&(lock->procLocks)));
            if (!hash_search_with_hash_value(LockMethodLockHash,
                                             (void *) &(lock->tag),
                                             hashcode,
                                             HASH_REMOVE,
                                             NULL))
                elog(PANIC, "lock table corrupted");
        }
        LWLockRelease(partitionLock);
        ereport(ERROR,
                (errcode(ERRCODE_OUT_OF_MEMORY),
                 errmsg("out of shared memory"),
                 errhint("You might need to increase max_locks_per_transaction.")));
    }

    /*
     * If new, initialize the new entry
     */
    if (!found)
    {
        Assert(proc->lockGroupLeader == NULL);
        proclock->groupLeader = proc;
        proclock->holdMask = 0;
        proclock->releaseMask = 0;
        /* Add proclock to appropriate lists */
        SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
        SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
                             &proclock->procLink);
        PROCLOCK_PRINT("lock_twophase_recover: new", proclock);
    }
    else
    {
        PROCLOCK_PRINT("lock_twophase_recover: found", proclock);
        Assert((proclock->holdMask & ~lock->grantMask) == 0);
    }

    /*
     * lock->nRequested and lock->requested[] count the total number of
     * requests, whether granted or waiting, so increment those immediately.
     */
    lock->nRequested++;
    lock->requested[lockmode]++;
    Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));

    /*
     * We shouldn't already hold the desired lock.
     */
    if (proclock->holdMask & LOCKBIT_ON(lockmode))
        elog(ERROR, "lock %s on object %u/%u/%u is already held",
             lockMethodTable->lockModeNames[lockmode],
             lock->tag.locktag_field1, lock->tag.locktag_field2,
             lock->tag.locktag_field3);

    /*
     * We ignore any possible conflicts and just grant ourselves the lock. Not
     * only because we don't bother, but also to avoid deadlocks when
     * switching from standby to normal mode. See function comment.
     */
    GrantLock(lock, proclock, lockmode);

    /*
     * Bump strong lock count, to make sure any fast-path lock requests won't
     * be granted without consulting the primary lock table.
     */
    if (ConflictsWithRelationFastPath(&lock->tag, lockmode))
    {
        uint32        fasthashcode = FastPathStrongLockHashPartition(hashcode);

        SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
        FastPathStrongRelationLocks->count[fasthashcode]++;
        SpinLockRelease(&FastPathStrongRelationLocks->mutex);
    }

    LWLockRelease(partitionLock);
}

/*
 * Re-acquire a lock belonging to a transaction that was prepared, when
 * starting up into hot standby mode.
 */
void
lock_twophase_standby_recover(TransactionId xid, uint16 info,
                              void *recdata, uint32 len)
{
    TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
    LOCKTAG    *locktag;
    LOCKMODE    lockmode;
    LOCKMETHODID lockmethodid;

    Assert(len == sizeof(TwoPhaseLockRecord));
    locktag = &rec->locktag;
    lockmode = rec->lockmode;
    lockmethodid = locktag->locktag_lockmethodid;

    if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
        elog(ERROR, "unrecognized lock method: %d", lockmethodid);

    if (lockmode == AccessExclusiveLock &&
        locktag->locktag_type == LOCKTAG_RELATION)
    {
        StandbyAcquireAccessExclusiveLock(xid,
                                          locktag->locktag_field1 /* dboid */ ,
                                          locktag->locktag_field2 /* reloid */ );
    }
}


/*
 * 2PC processing routine for COMMIT PREPARED case.
 *
 * Find and release the lock indicated by the 2PC record.
 */
void
lock_twophase_postcommit(TransactionId xid, uint16 info,
                         void *recdata, uint32 len)
{
    TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
    PGPROC       *proc = TwoPhaseGetDummyProc(xid);
    LOCKTAG    *locktag;
    LOCKMETHODID lockmethodid;
    LockMethod    lockMethodTable;

    Assert(len == sizeof(TwoPhaseLockRecord));
    locktag = &rec->locktag;
    lockmethodid = locktag->locktag_lockmethodid;

    if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
        elog(ERROR, "unrecognized lock method: %d", lockmethodid);
    lockMethodTable = LockMethods[lockmethodid];

    LockRefindAndRelease(lockMethodTable, proc, locktag, rec->lockmode, true);
}

/*
 * 2PC processing routine for ROLLBACK PREPARED case.
 *
 * This is actually just the same as the COMMIT case.
 */
void
lock_twophase_postabort(TransactionId xid, uint16 info,
                        void *recdata, uint32 len)
{
    lock_twophase_postcommit(xid, info, recdata, len);
}

/*
 *        VirtualXactLockTableInsert
 *
 *        Take vxid lock via the fast-path.  There can't be any pre-existing
 *        lockers, as we haven't advertised this vxid via the ProcArray yet.
 *
 *        Since MyProc->fpLocalTransactionId will normally contain the same data
 *        as MyProc->lxid, you might wonder if we really need both.  The
 *        difference is that MyProc->lxid is set and cleared unlocked, and
 *        examined by procarray.c, while fpLocalTransactionId is protected by
 *        backendLock and is used only by the locking subsystem.  Doing it this
 *        way makes it easier to verify that there are no funny race conditions.
 *
 *        We don't bother recording this lock in the local lock table, since it's
 *        only ever released at the end of a transaction.  Instead,
 *        LockReleaseAll() calls VirtualXactLockTableCleanup().
 */
void
VirtualXactLockTableInsert(VirtualTransactionId vxid)
{
    Assert(VirtualTransactionIdIsValid(vxid));

    LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);

    Assert(MyProc->backendId == vxid.backendId);
    Assert(MyProc->fpLocalTransactionId == InvalidLocalTransactionId);
    Assert(MyProc->fpVXIDLock == false);

    MyProc->fpVXIDLock = true;
    MyProc->fpLocalTransactionId = vxid.localTransactionId;

    LWLockRelease(&MyProc->backendLock);
}

/*
 *        VirtualXactLockTableCleanup
 *
 *        Check whether a VXID lock has been materialized; if so, release it,
 *        unblocking waiters.
 */
void
VirtualXactLockTableCleanup(void)
{
    bool        fastpath;
    LocalTransactionId lxid;

    Assert(MyProc->backendId != InvalidBackendId);

    /*
     * Clean up shared memory state.
     */
    LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);

    fastpath = MyProc->fpVXIDLock;
    lxid = MyProc->fpLocalTransactionId;
    MyProc->fpVXIDLock = false;
    MyProc->fpLocalTransactionId = InvalidLocalTransactionId;

    LWLockRelease(&MyProc->backendLock);

    /*
     * If fpVXIDLock has been cleared without touching fpLocalTransactionId,
     * that means someone transferred the lock to the main lock table.
     */
    if (!fastpath && LocalTransactionIdIsValid(lxid))
    {
        VirtualTransactionId vxid;
        LOCKTAG        locktag;

        vxid.backendId = MyBackendId;
        vxid.localTransactionId = lxid;
        SET_LOCKTAG_VIRTUALTRANSACTION(locktag, vxid);

        LockRefindAndRelease(LockMethods[DEFAULT_LOCKMETHOD], MyProc,
                             &locktag, ExclusiveLock, false);
    }
}

/*
 *        VirtualXactLock
 *
 * If wait = true, wait until the given VXID has been released, and then
 * return true.
 *
 * If wait = false, just check whether the VXID is still running, and return
 * true or false.
 */
bool
VirtualXactLock(VirtualTransactionId vxid, bool wait)
{
    LOCKTAG        tag;
    PGPROC       *proc;

    Assert(VirtualTransactionIdIsValid(vxid));

    SET_LOCKTAG_VIRTUALTRANSACTION(tag, vxid);

    /*
     * If a lock table entry must be made, this is the PGPROC on whose behalf
     * it must be done.  Note that the transaction might end or the PGPROC
     * might be reassigned to a new backend before we get around to examining
     * it, but it doesn't matter.  If we find upon examination that the
     * relevant lxid is no longer running here, that's enough to prove that
     * it's no longer running anywhere.
     */
    proc = BackendIdGetProc(vxid.backendId);
    if (proc == NULL)
        return true;

    /*
     * We must acquire this lock before checking the backendId and lxid
     * against the ones we're waiting for.  The target backend will only set
     * or clear lxid while holding this lock.
     */
    LWLockAcquire(&proc->backendLock, LW_EXCLUSIVE);

    /* If the transaction has ended, our work here is done. */
    if (proc->backendId != vxid.backendId
        || proc->fpLocalTransactionId != vxid.localTransactionId)
    {
        LWLockRelease(&proc->backendLock);
        return true;
    }

    /*
     * If we aren't asked to wait, there's no need to set up a lock table
     * entry.  The transaction is still in progress, so just return false.
     */
    if (!wait)
    {
        LWLockRelease(&proc->backendLock);
        return false;
    }

    /*
     * OK, we're going to need to sleep on the VXID.  But first, we must set
     * up the primary lock table entry, if needed (ie, convert the proc's
     * fast-path lock on its VXID to a regular lock).
     */
    if (proc->fpVXIDLock)
    {
        PROCLOCK   *proclock;
        uint32        hashcode;
        LWLock       *partitionLock;

        hashcode = LockTagHashCode(&tag);

        partitionLock = LockHashPartitionLock(hashcode);
        LWLockAcquire(partitionLock, LW_EXCLUSIVE);

        proclock = SetupLockInTable(LockMethods[DEFAULT_LOCKMETHOD], proc,
                                    &tag, hashcode, ExclusiveLock);
        if (!proclock)
        {
            LWLockRelease(partitionLock);
            LWLockRelease(&proc->backendLock);
            ereport(ERROR,
                    (errcode(ERRCODE_OUT_OF_MEMORY),
                     errmsg("out of shared memory"),
                     errhint("You might need to increase max_locks_per_transaction.")));
        }
        GrantLock(proclock->tag.myLock, proclock, ExclusiveLock);

        LWLockRelease(partitionLock);

        proc->fpVXIDLock = false;
    }

    /* Done with proc->fpLockBits */
    LWLockRelease(&proc->backendLock);

    /* Time to wait. */
    (void) LockAcquire(&tag, ShareLock, false, false);

    LockRelease(&tag, ShareLock, false);
    return true;
}

/*
 * LockWaiterCount
 *
 * Find the number of lock requester on this locktag
 */
int
LockWaiterCount(const LOCKTAG *locktag)
{
    LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
    LOCK       *lock;
    bool        found;
    uint32        hashcode;
    LWLock       *partitionLock;
    int            waiters = 0;

    if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
        elog(ERROR, "unrecognized lock method: %d", lockmethodid);

    hashcode = LockTagHashCode(locktag);
    partitionLock = LockHashPartitionLock(hashcode);
    LWLockAcquire(partitionLock, LW_EXCLUSIVE);

    lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
                                                (const void *) locktag,
                                                hashcode,
                                                HASH_FIND,
                                                &found);
    if (found)
    {
        Assert(lock != NULL);
        waiters = lock->nRequested;
    }
    LWLockRelease(partitionLock);

    return waiters;
}

#ifdef __OPENTENBASE__
void StillHoldlock(void)
{
    bool hold = false;
    int  i    = 0;
    PROCLOCK   *proclock;
    HASH_SEQ_STATUS seqstat;
    
    for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
    {
        LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED);
    }
    
    
    /* Now scan the tables to copy the data */
    hash_seq_init(&seqstat, LockMethodProcLockHash);

    while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat)))
    {
        PGPROC       *proc = proclock->tag.myProc;
        
        if (proc == MyProc)
        {
            hold = true;
            break;
        }
    }
    
    for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
    {
        LWLockRelease(LockHashPartitionLockByIndex(i));
    }

    if (hold)
    {
        abort();
    }
}
#endif

